/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.sql.spark.asyncquery;

import com.amazonaws.services.emrserverless.model.JobRunState;
import java.util.ArrayList;
import java.util.Optional;
import lombok.Generated;
import org.json.JSONObject;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService;
import org.opensearch.sql.spark.asyncquery.AsyncQueryJobMetadataStorageService;
import org.opensearch.sql.spark.asyncquery.exceptions.AsyncQueryNotFoundException;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryExecutionResponse;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfig;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfigSupplier;
import org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
import org.opensearch.sql.spark.functions.response.DefaultSparkSqlFunctionResponseHandle;
import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest;
import org.opensearch.sql.spark.rest.model.CreateAsyncQueryResponse;

public class AsyncQueryExecutorServiceImpl
implements AsyncQueryExecutorService {
    private AsyncQueryJobMetadataStorageService asyncQueryJobMetadataStorageService;
    private SparkQueryDispatcher sparkQueryDispatcher;
    private SparkExecutionEngineConfigSupplier sparkExecutionEngineConfigSupplier;

    @Override
    public CreateAsyncQueryResponse createAsyncQuery(CreateAsyncQueryRequest createAsyncQueryRequest) {
        SparkExecutionEngineConfig sparkExecutionEngineConfig = this.sparkExecutionEngineConfigSupplier.getSparkExecutionEngineConfig();
        DispatchQueryResponse dispatchQueryResponse = this.sparkQueryDispatcher.dispatch(new DispatchQueryRequest(sparkExecutionEngineConfig.getApplicationId(), createAsyncQueryRequest.getQuery(), createAsyncQueryRequest.getDatasource(), createAsyncQueryRequest.getLang(), sparkExecutionEngineConfig.getExecutionRoleARN(), sparkExecutionEngineConfig.getClusterName(), sparkExecutionEngineConfig.getSparkSubmitParameters(), createAsyncQueryRequest.getSessionId()));
        this.asyncQueryJobMetadataStorageService.storeJobMetadata(new AsyncQueryJobMetadata(dispatchQueryResponse.getQueryId(), sparkExecutionEngineConfig.getApplicationId(), dispatchQueryResponse.getJobId(), dispatchQueryResponse.getResultIndex(), dispatchQueryResponse.getSessionId(), dispatchQueryResponse.getDatasourceName(), dispatchQueryResponse.getJobType(), dispatchQueryResponse.getIndexName()));
        return new CreateAsyncQueryResponse(dispatchQueryResponse.getQueryId().getId(), dispatchQueryResponse.getSessionId());
    }

    @Override
    public AsyncQueryExecutionResponse getAsyncQueryResults(String queryId) {
        Optional<AsyncQueryJobMetadata> jobMetadata = this.asyncQueryJobMetadataStorageService.getJobMetadata(queryId);
        if (jobMetadata.isPresent()) {
            String sessionId = jobMetadata.get().getSessionId();
            JSONObject jsonObject = this.sparkQueryDispatcher.getQueryResponse(jobMetadata.get());
            if (JobRunState.SUCCESS.toString().equals(jsonObject.getString("status"))) {
                DefaultSparkSqlFunctionResponseHandle sparkSqlFunctionResponseHandle = new DefaultSparkSqlFunctionResponseHandle(jsonObject);
                ArrayList<ExprValue> result = new ArrayList<ExprValue>();
                while (sparkSqlFunctionResponseHandle.hasNext()) {
                    result.add(sparkSqlFunctionResponseHandle.next());
                }
                return new AsyncQueryExecutionResponse(JobRunState.SUCCESS.toString(), sparkSqlFunctionResponseHandle.schema(), result, null, sessionId);
            }
            return new AsyncQueryExecutionResponse(jsonObject.optString("status", JobRunState.FAILED.toString()), null, null, jsonObject.optString("error", ""), sessionId);
        }
        throw new AsyncQueryNotFoundException(String.format("QueryId: %s not found", queryId));
    }

    @Override
    public String cancelQuery(String queryId) {
        Optional<AsyncQueryJobMetadata> asyncQueryJobMetadata = this.asyncQueryJobMetadataStorageService.getJobMetadata(queryId);
        if (asyncQueryJobMetadata.isPresent()) {
            return this.sparkQueryDispatcher.cancelJob(asyncQueryJobMetadata.get());
        }
        throw new AsyncQueryNotFoundException(String.format("QueryId: %s not found", queryId));
    }

    @Generated
    public AsyncQueryExecutorServiceImpl(AsyncQueryJobMetadataStorageService asyncQueryJobMetadataStorageService, SparkQueryDispatcher sparkQueryDispatcher, SparkExecutionEngineConfigSupplier sparkExecutionEngineConfigSupplier) {
        this.asyncQueryJobMetadataStorageService = asyncQueryJobMetadataStorageService;
        this.sparkQueryDispatcher = sparkQueryDispatcher;
        this.sparkExecutionEngineConfigSupplier = sparkExecutionEngineConfigSupplier;
    }
}

