/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.sql.spark.dispatcher;

import com.amazonaws.services.emrserverless.model.CancelJobRunResult;
import com.amazonaws.services.emrserverless.model.GetJobRunResult;
import com.amazonaws.services.emrserverless.model.JobRunState;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import lombok.Generated;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONArray;
import org.json.JSONObject;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelperImpl;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.asyncquery.model.SparkSubmitParameters;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.client.StartJobRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
import org.opensearch.sql.spark.dispatcher.model.FullyQualifiedTableName;
import org.opensearch.sql.spark.dispatcher.model.IndexDetails;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexMetadataReader;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;
import org.opensearch.sql.spark.rest.model.LangType;
import org.opensearch.sql.spark.utils.SQLQueryUtils;

public class SparkQueryDispatcher {
    private static final Logger LOG = LogManager.getLogger();
    public static final String INDEX_TAG_KEY = "index";
    public static final String DATASOURCE_TAG_KEY = "datasource";
    public static final String SCHEMA_TAG_KEY = "schema";
    public static final String TABLE_TAG_KEY = "table";
    public static final String CLUSTER_NAME_TAG_KEY = "cluster";
    private EMRServerlessClient emrServerlessClient;
    private DataSourceService dataSourceService;
    private DataSourceUserAuthorizationHelperImpl dataSourceUserAuthorizationHelper;
    private JobExecutionResponseReader jobExecutionResponseReader;
    private FlintIndexMetadataReader flintIndexMetadataReader;
    private Client client;

    public DispatchQueryResponse dispatch(DispatchQueryRequest dispatchQueryRequest) {
        if (LangType.SQL.equals((Object)dispatchQueryRequest.getLangType())) {
            return this.handleSQLQuery(dispatchQueryRequest);
        }
        return this.handleNonIndexQuery(dispatchQueryRequest);
    }

    public JSONObject getQueryResponse(AsyncQueryJobMetadata asyncQueryJobMetadata) {
        if (asyncQueryJobMetadata.isDropIndexQuery()) {
            return DropIndexResult.fromJobId(asyncQueryJobMetadata.getJobId()).result();
        }
        JSONObject result = this.jobExecutionResponseReader.getResultFromOpensearchIndex(asyncQueryJobMetadata.getJobId(), asyncQueryJobMetadata.getResultIndex());
        if (result.has("data")) {
            JSONObject items = result.getJSONObject("data");
            String status = items.optString("status", JobRunState.FAILED.toString());
            result.put("status", (Object)status);
            String error = items.optString("error", "");
            result.put("error", (Object)error);
        } else {
            GetJobRunResult getJobRunResult = this.emrServerlessClient.getJobRunResult(asyncQueryJobMetadata.getApplicationId(), asyncQueryJobMetadata.getJobId());
            String jobState = getJobRunResult.getJobRun().getState();
            result.put("status", (Object)jobState);
            result.put("error", (Object)"");
        }
        return result;
    }

    public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
        CancelJobRunResult cancelJobRunResult = this.emrServerlessClient.cancelJobRun(asyncQueryJobMetadata.getApplicationId(), asyncQueryJobMetadata.getJobId());
        return cancelJobRunResult.getJobRunId();
    }

    private DispatchQueryResponse handleSQLQuery(DispatchQueryRequest dispatchQueryRequest) {
        if (SQLQueryUtils.isIndexQuery(dispatchQueryRequest.getQuery())) {
            IndexDetails indexDetails = SQLQueryUtils.extractIndexDetails(dispatchQueryRequest.getQuery());
            if (indexDetails.isDropIndex()) {
                return this.handleDropIndexQuery(dispatchQueryRequest, indexDetails);
            }
            return this.handleIndexQuery(dispatchQueryRequest, indexDetails);
        }
        return this.handleNonIndexQuery(dispatchQueryRequest);
    }

    private DispatchQueryResponse handleIndexQuery(DispatchQueryRequest dispatchQueryRequest, IndexDetails indexDetails) {
        FullyQualifiedTableName fullyQualifiedTableName = indexDetails.getFullyQualifiedTableName();
        DataSourceMetadata dataSourceMetadata = this.dataSourceService.getRawDataSourceMetadata(dispatchQueryRequest.getDatasource());
        this.dataSourceUserAuthorizationHelper.authorizeDataSource(dataSourceMetadata);
        String jobName = dispatchQueryRequest.getClusterName() + ":index-query";
        Map<String, String> tags = SparkQueryDispatcher.getDefaultTagsForJobSubmission(dispatchQueryRequest);
        tags.put(INDEX_TAG_KEY, indexDetails.getIndexName());
        tags.put(TABLE_TAG_KEY, fullyQualifiedTableName.getTableName());
        tags.put(SCHEMA_TAG_KEY, fullyQualifiedTableName.getSchemaName());
        StartJobRequest startJobRequest = new StartJobRequest(dispatchQueryRequest.getQuery(), jobName, dispatchQueryRequest.getApplicationId(), dispatchQueryRequest.getExecutionRoleARN(), SparkSubmitParameters.Builder.builder().dataSource(this.dataSourceService.getRawDataSourceMetadata(dispatchQueryRequest.getDatasource())).structuredStreaming(indexDetails.getAutoRefresh()).extraParameters(dispatchQueryRequest.getExtraSparkSubmitParams()).build().toString(), tags, indexDetails.getAutoRefresh(), dataSourceMetadata.getResultIndex());
        String jobId = this.emrServerlessClient.startJobRun(startJobRequest);
        return new DispatchQueryResponse(jobId, false, dataSourceMetadata.getResultIndex());
    }

    private DispatchQueryResponse handleNonIndexQuery(DispatchQueryRequest dispatchQueryRequest) {
        DataSourceMetadata dataSourceMetadata = this.dataSourceService.getRawDataSourceMetadata(dispatchQueryRequest.getDatasource());
        this.dataSourceUserAuthorizationHelper.authorizeDataSource(dataSourceMetadata);
        String jobName = dispatchQueryRequest.getClusterName() + ":non-index-query";
        Map<String, String> tags = SparkQueryDispatcher.getDefaultTagsForJobSubmission(dispatchQueryRequest);
        StartJobRequest startJobRequest = new StartJobRequest(dispatchQueryRequest.getQuery(), jobName, dispatchQueryRequest.getApplicationId(), dispatchQueryRequest.getExecutionRoleARN(), SparkSubmitParameters.Builder.builder().dataSource(this.dataSourceService.getRawDataSourceMetadata(dispatchQueryRequest.getDatasource())).extraParameters(dispatchQueryRequest.getExtraSparkSubmitParams()).build().toString(), tags, false, dataSourceMetadata.getResultIndex());
        String jobId = this.emrServerlessClient.startJobRun(startJobRequest);
        return new DispatchQueryResponse(jobId, false, dataSourceMetadata.getResultIndex());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private DispatchQueryResponse handleDropIndexQuery(DispatchQueryRequest dispatchQueryRequest, IndexDetails indexDetails) {
        String indexName;
        DataSourceMetadata dataSourceMetadata = this.dataSourceService.getRawDataSourceMetadata(dispatchQueryRequest.getDatasource());
        this.dataSourceUserAuthorizationHelper.authorizeDataSource(dataSourceMetadata);
        FlintIndexMetadata indexMetadata = this.flintIndexMetadataReader.getFlintIndexMetadata(indexDetails);
        String status = JobRunState.FAILED.toString();
        try {
            if (indexMetadata.isAutoRefresh()) {
                this.emrServerlessClient.cancelJobRun(dispatchQueryRequest.getApplicationId(), indexMetadata.getJobId());
            }
            indexName = indexDetails.openSearchIndexName();
        }
        catch (Throwable throwable) {
            String indexName2 = indexDetails.openSearchIndexName();
            try {
                AcknowledgedResponse response = (AcknowledgedResponse)this.client.admin().indices().delete(new DeleteIndexRequest().indices(new String[]{indexName2})).get();
                if (!response.isAcknowledged()) {
                    LOG.error("failed to delete index");
                }
                status = JobRunState.SUCCESS.toString();
            }
            catch (InterruptedException | ExecutionException e) {
                LOG.error("failed to delete index");
            }
            throw throwable;
        }
        try {
            AcknowledgedResponse response = (AcknowledgedResponse)this.client.admin().indices().delete(new DeleteIndexRequest().indices(new String[]{indexName})).get();
            if (!response.isAcknowledged()) {
                LOG.error("failed to delete index");
            }
            status = JobRunState.SUCCESS.toString();
        }
        catch (InterruptedException | ExecutionException e) {
            LOG.error("failed to delete index");
        }
        return new DispatchQueryResponse(new DropIndexResult(status).toJobId(), true, dataSourceMetadata.getResultIndex());
    }

    private static Map<String, String> getDefaultTagsForJobSubmission(DispatchQueryRequest dispatchQueryRequest) {
        HashMap<String, String> tags = new HashMap<String, String>();
        tags.put(CLUSTER_NAME_TAG_KEY, dispatchQueryRequest.getClusterName());
        tags.put(DATASOURCE_TAG_KEY, dispatchQueryRequest.getDatasource());
        return tags;
    }

    @Generated
    public SparkQueryDispatcher(EMRServerlessClient emrServerlessClient, DataSourceService dataSourceService, DataSourceUserAuthorizationHelperImpl dataSourceUserAuthorizationHelper, JobExecutionResponseReader jobExecutionResponseReader, FlintIndexMetadataReader flintIndexMetadataReader, Client client) {
        this.emrServerlessClient = emrServerlessClient;
        this.dataSourceService = dataSourceService;
        this.dataSourceUserAuthorizationHelper = dataSourceUserAuthorizationHelper;
        this.jobExecutionResponseReader = jobExecutionResponseReader;
        this.flintIndexMetadataReader = flintIndexMetadataReader;
        this.client = client;
    }

    public static class DropIndexResult {
        private static final int PREFIX_LEN = 10;
        private final String status;

        public static DropIndexResult fromJobId(String jobId) {
            String status = new String(Base64.getDecoder().decode(jobId)).substring(10);
            return new DropIndexResult(status);
        }

        public String toJobId() {
            String queryId = RandomStringUtils.randomAlphanumeric((int)10) + this.status;
            return Base64.getEncoder().encodeToString(queryId.getBytes(StandardCharsets.UTF_8));
        }

        public JSONObject result() {
            JSONObject result = new JSONObject();
            if (JobRunState.SUCCESS.toString().equalsIgnoreCase(this.status)) {
                result.put("status", (Object)this.status);
                JSONObject dummyData = new JSONObject();
                dummyData.put("result", (Object)new JSONArray());
                dummyData.put(SparkQueryDispatcher.SCHEMA_TAG_KEY, (Object)new JSONArray());
                dummyData.put("applicationId", (Object)"fakeDropIndexApplicationId");
                result.put("data", (Object)dummyData);
            } else {
                result.put("status", (Object)this.status);
                result.put("error", (Object)"failed to drop index");
            }
            return result;
        }

        @Generated
        public String getStatus() {
            return this.status;
        }

        @Generated
        public DropIndexResult(String status) {
            this.status = status;
        }
    }
}

