/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.sql.spark.dispatcher;

import com.amazonaws.services.emrserverless.model.JobRunState;
import java.util.Map;
import lombok.Generated;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;
import org.opensearch.client.Client;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.dispatcher.AsyncQueryHandler;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
import org.opensearch.sql.spark.dispatcher.model.IndexDMLResult;
import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails;
import org.opensearch.sql.spark.execution.statement.StatementState;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpAlter;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpDrop;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpVacuum;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;

public class IndexDMLHandler
extends AsyncQueryHandler {
    private static final Logger LOG = LogManager.getLogger();
    public static final String DROP_INDEX_JOB_ID = "dropIndexJobId";
    public static final String DML_QUERY_JOB_ID = "DMLQueryJobId";
    private final EMRServerlessClient emrServerlessClient;
    private final JobExecutionResponseReader jobExecutionResponseReader;
    private final FlintIndexMetadataService flintIndexMetadataService;
    private final StateStore stateStore;
    private final Client client;

    public static boolean isIndexDMLQuery(String jobId) {
        return DROP_INDEX_JOB_ID.equalsIgnoreCase(jobId) || DML_QUERY_JOB_ID.equalsIgnoreCase(jobId);
    }

    @Override
    public DispatchQueryResponse submit(DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) {
        DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata();
        long startTime = System.currentTimeMillis();
        try {
            IndexQueryDetails indexDetails = context.getIndexQueryDetails();
            FlintIndexMetadata indexMetadata = this.getFlintIndexMetadata(indexDetails);
            this.executeIndexOp(dispatchQueryRequest, indexDetails, indexMetadata);
            AsyncQueryId asyncQueryId = this.storeIndexDMLResult(dispatchQueryRequest, dataSourceMetadata, JobRunState.SUCCESS.toString(), "", startTime);
            return new DispatchQueryResponse(asyncQueryId, DML_QUERY_JOB_ID, dataSourceMetadata.getResultIndex(), null);
        }
        catch (Exception e) {
            LOG.error(e.getMessage());
            AsyncQueryId asyncQueryId = this.storeIndexDMLResult(dispatchQueryRequest, dataSourceMetadata, JobRunState.FAILED.toString(), e.getMessage(), startTime);
            return new DispatchQueryResponse(asyncQueryId, DML_QUERY_JOB_ID, dataSourceMetadata.getResultIndex(), null);
        }
    }

    private AsyncQueryId storeIndexDMLResult(DispatchQueryRequest dispatchQueryRequest, DataSourceMetadata dataSourceMetadata, String status, String error, long startTime) {
        AsyncQueryId asyncQueryId = AsyncQueryId.newAsyncQueryId(dataSourceMetadata.getName());
        IndexDMLResult indexDMLResult = new IndexDMLResult(asyncQueryId.getId(), status, error, dispatchQueryRequest.getDatasource(), System.currentTimeMillis() - startTime, System.currentTimeMillis());
        StateStore.createIndexDMLResult(this.stateStore, dataSourceMetadata.getResultIndex()).apply(indexDMLResult);
        return asyncQueryId;
    }

    private void executeIndexOp(DispatchQueryRequest dispatchQueryRequest, IndexQueryDetails indexQueryDetails, FlintIndexMetadata indexMetadata) {
        switch (indexQueryDetails.getIndexQueryActionType()) {
            case DROP: {
                FlintIndexOpDrop dropOp = new FlintIndexOpDrop(this.stateStore, dispatchQueryRequest.getDatasource(), this.emrServerlessClient);
                dropOp.apply(indexMetadata);
                break;
            }
            case ALTER: {
                FlintIndexOpAlter flintIndexOpAlter = new FlintIndexOpAlter(indexQueryDetails.getFlintIndexOptions(), this.stateStore, dispatchQueryRequest.getDatasource(), this.emrServerlessClient, this.flintIndexMetadataService);
                flintIndexOpAlter.apply(indexMetadata);
                break;
            }
            case VACUUM: {
                FlintIndexOpDrop tryDropOp = new FlintIndexOpDrop(this.stateStore, dispatchQueryRequest.getDatasource(), this.emrServerlessClient);
                try {
                    tryDropOp.apply(indexMetadata);
                }
                catch (IllegalStateException illegalStateException) {
                    // empty catch block
                }
                FlintIndexOpVacuum indexVacuumOp = new FlintIndexOpVacuum(this.stateStore, dispatchQueryRequest.getDatasource(), this.client);
                indexVacuumOp.apply(indexMetadata);
                break;
            }
            default: {
                throw new IllegalStateException(String.format("IndexQueryActionType: %s is not supported in IndexDMLHandler.", new Object[]{indexQueryDetails.getIndexQueryActionType()}));
            }
        }
    }

    private FlintIndexMetadata getFlintIndexMetadata(IndexQueryDetails indexDetails) {
        Map<String, FlintIndexMetadata> indexMetadataMap = this.flintIndexMetadataService.getFlintIndexMetadata(indexDetails.openSearchIndexName());
        if (!indexMetadataMap.containsKey(indexDetails.openSearchIndexName())) {
            throw new IllegalStateException(String.format("Couldn't fetch flint index: %s details", indexDetails.openSearchIndexName()));
        }
        return indexMetadataMap.get(indexDetails.openSearchIndexName());
    }

    @Override
    protected JSONObject getResponseFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata) {
        String queryId = asyncQueryJobMetadata.getQueryId().getId();
        return this.jobExecutionResponseReader.getResultWithQueryId(queryId, asyncQueryJobMetadata.getResultIndex());
    }

    @Override
    protected JSONObject getResponseFromExecutor(AsyncQueryJobMetadata asyncQueryJobMetadata) {
        JSONObject result = new JSONObject();
        result.put("status", (Object)StatementState.RUNNING.getState());
        result.put("error", (Object)"");
        return result;
    }

    @Override
    public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
        throw new IllegalArgumentException("can't cancel index DML query");
    }

    @Generated
    public IndexDMLHandler(EMRServerlessClient emrServerlessClient, JobExecutionResponseReader jobExecutionResponseReader, FlintIndexMetadataService flintIndexMetadataService, StateStore stateStore, Client client) {
        this.emrServerlessClient = emrServerlessClient;
        this.jobExecutionResponseReader = jobExecutionResponseReader;
        this.flintIndexMetadataService = flintIndexMetadataService;
        this.stateStore = stateStore;
        this.client = client;
    }
}

