/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.sql.spark.dispatcher;

import java.util.Map;
import java.util.Optional;
import lombok.Generated;
import org.json.JSONObject;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.utils.MetricUtils;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.asyncquery.model.SparkSubmitParameters;
import org.opensearch.sql.spark.dispatcher.AsyncQueryHandler;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
import org.opensearch.sql.spark.dispatcher.model.JobType;
import org.opensearch.sql.spark.execution.session.CreateSessionRequest;
import org.opensearch.sql.spark.execution.session.Session;
import org.opensearch.sql.spark.execution.session.SessionId;
import org.opensearch.sql.spark.execution.session.SessionManager;
import org.opensearch.sql.spark.execution.statement.QueryRequest;
import org.opensearch.sql.spark.execution.statement.Statement;
import org.opensearch.sql.spark.execution.statement.StatementId;
import org.opensearch.sql.spark.execution.statement.StatementState;
import org.opensearch.sql.spark.leasemanager.LeaseManager;
import org.opensearch.sql.spark.leasemanager.model.LeaseRequest;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;

public class InteractiveQueryHandler
extends AsyncQueryHandler {
    private final SessionManager sessionManager;
    private final JobExecutionResponseReader jobExecutionResponseReader;
    private final LeaseManager leaseManager;

    @Override
    protected JSONObject getResponseFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata) {
        String queryId = asyncQueryJobMetadata.getQueryId().getId();
        return this.jobExecutionResponseReader.getResultWithQueryId(queryId, asyncQueryJobMetadata.getResultIndex());
    }

    @Override
    protected JSONObject getResponseFromExecutor(AsyncQueryJobMetadata asyncQueryJobMetadata) {
        JSONObject result = new JSONObject();
        String queryId = asyncQueryJobMetadata.getQueryId().getId();
        Statement statement = this.getStatementByQueryId(asyncQueryJobMetadata.getSessionId(), queryId);
        StatementState statementState = statement.getStatementState();
        result.put("status", (Object)statementState.getState());
        result.put("error", (Object)Optional.of(statement.getStatementModel().getError()).orElse(""));
        return result;
    }

    @Override
    public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
        String queryId = asyncQueryJobMetadata.getQueryId().getId();
        this.getStatementByQueryId(asyncQueryJobMetadata.getSessionId(), queryId).cancel();
        return queryId;
    }

    @Override
    public DispatchQueryResponse submit(DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) {
        SessionId sessionId;
        Optional<Session> createdSession;
        Session session = null;
        String clusterName = dispatchQueryRequest.getClusterName();
        Map<String, String> tags = context.getTags();
        DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata();
        this.leaseManager.borrow(new LeaseRequest(JobType.INTERACTIVE, dispatchQueryRequest.getDatasource()));
        if (dispatchQueryRequest.getSessionId() != null && (createdSession = this.sessionManager.getSession(sessionId = new SessionId(dispatchQueryRequest.getSessionId()))).isPresent()) {
            session = createdSession.get();
        }
        if (session == null || !session.isOperationalForDataSource(dispatchQueryRequest.getDatasource())) {
            tags.put("type", JobType.INTERACTIVE.getText());
            session = this.sessionManager.createSession(new CreateSessionRequest(clusterName, dispatchQueryRequest.getApplicationId(), dispatchQueryRequest.getExecutionRoleARN(), SparkSubmitParameters.Builder.builder().className("org.apache.spark.sql.FlintREPL").clusterName(clusterName).dataSource(dataSourceMetadata).extraParameters(dispatchQueryRequest.getExtraSparkSubmitParams()), tags, dataSourceMetadata.getResultIndex(), dataSourceMetadata.getName()));
            MetricUtils.incrementNumericalMetric((MetricName)MetricName.EMR_INTERACTIVE_QUERY_JOBS_CREATION_COUNT);
        }
        session.submit(new QueryRequest(context.getQueryId(), dispatchQueryRequest.getLangType(), dispatchQueryRequest.getQuery()));
        return new DispatchQueryResponse(context.getQueryId(), session.getSessionModel().getJobId(), dataSourceMetadata.getResultIndex(), session.getSessionId().getSessionId());
    }

    private Statement getStatementByQueryId(String sid, String qid) {
        SessionId sessionId = new SessionId(sid);
        Optional<Session> session = this.sessionManager.getSession(sessionId);
        if (session.isPresent()) {
            StatementId statementId = new StatementId(qid);
            Optional<Statement> statement = session.get().get(statementId);
            if (statement.isPresent()) {
                return statement.get();
            }
            throw new IllegalArgumentException("no statement found. " + statementId);
        }
        throw new IllegalArgumentException("no session found. " + sessionId);
    }

    @Generated
    public InteractiveQueryHandler(SessionManager sessionManager, JobExecutionResponseReader jobExecutionResponseReader, LeaseManager leaseManager) {
        this.sessionManager = sessionManager;
        this.jobExecutionResponseReader = jobExecutionResponseReader;
        this.leaseManager = leaseManager;
    }
}

