/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.sql.spark.execution.session;

import java.util.Optional;
import lombok.Generated;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.client.StartJobRequest;
import org.opensearch.sql.spark.execution.session.CreateSessionRequest;
import org.opensearch.sql.spark.execution.session.Session;
import org.opensearch.sql.spark.execution.session.SessionId;
import org.opensearch.sql.spark.execution.session.SessionModel;
import org.opensearch.sql.spark.execution.session.SessionState;
import org.opensearch.sql.spark.execution.statement.QueryRequest;
import org.opensearch.sql.spark.execution.statement.Statement;
import org.opensearch.sql.spark.execution.statement.StatementId;
import org.opensearch.sql.spark.execution.statement.StatementModel;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.rest.model.LangType;
import org.opensearch.sql.spark.utils.TimeProvider;

public class InteractiveSession
implements Session {
    private static final Logger LOG = LogManager.getLogger();
    public static final String SESSION_ID_TAG_KEY = "sid";
    private final SessionId sessionId;
    private final StateStore stateStore;
    private final EMRServerlessClient serverlessClient;
    private SessionModel sessionModel;
    private long sessionInactivityTimeoutMilli;
    private TimeProvider timeProvider;

    @Override
    public void open(CreateSessionRequest createSessionRequest) {
        try {
            createSessionRequest.getSparkSubmitParametersBuilder().sessionExecution(this.sessionId.getSessionId(), createSessionRequest.getDatasourceName());
            createSessionRequest.getTags().put(SESSION_ID_TAG_KEY, this.sessionId.getSessionId());
            StartJobRequest startJobRequest = createSessionRequest.getStartJobRequest(this.sessionId.getSessionId());
            String jobID = this.serverlessClient.startJobRun(startJobRequest);
            String applicationId = startJobRequest.getApplicationId();
            this.sessionModel = SessionModel.initInteractiveSession(applicationId, jobID, this.sessionId, createSessionRequest.getDatasourceName());
            StateStore.createSession(this.stateStore, this.sessionModel.getDatasourceName()).apply(this.sessionModel);
        }
        catch (VersionConflictEngineException e) {
            String errorMsg = "session already exist. " + this.sessionId;
            LOG.error(errorMsg);
            throw new IllegalStateException(errorMsg);
        }
    }

    @Override
    public void close() {
        Optional<SessionModel> model = StateStore.getSession(this.stateStore, this.sessionModel.getDatasourceName()).apply(this.sessionModel.getId());
        if (model.isEmpty()) {
            throw new IllegalStateException("session does not exist. " + this.sessionModel.getSessionId());
        }
        this.serverlessClient.cancelJobRun(this.sessionModel.getApplicationId(), this.sessionModel.getJobId(), false);
    }

    @Override
    public StatementId submit(QueryRequest request) {
        Optional<SessionModel> model = StateStore.getSession(this.stateStore, this.sessionModel.getDatasourceName()).apply(this.sessionModel.getId());
        if (model.isEmpty()) {
            throw new IllegalStateException("session does not exist. " + this.sessionModel.getSessionId());
        }
        this.sessionModel = model.get();
        if (!SessionState.END_STATE.contains((Object)this.sessionModel.getSessionState())) {
            String qid = request.getQueryId().getId();
            StatementId statementId = StatementId.newStatementId(qid);
            Statement st = Statement.builder().sessionId(this.sessionId).applicationId(this.sessionModel.getApplicationId()).jobId(this.sessionModel.getJobId()).stateStore(this.stateStore).statementId(statementId).langType(LangType.SQL).datasourceName(this.sessionModel.getDatasourceName()).query(request.getQuery()).queryId(qid).build();
            st.open();
            return statementId;
        }
        String errMsg = String.format("can't submit statement, session should not be in end state, current session state is: %s", this.sessionModel.getSessionState().getSessionState());
        LOG.debug(errMsg);
        throw new IllegalStateException(errMsg);
    }

    @Override
    public Optional<Statement> get(StatementId stID) {
        return StateStore.getStatement(this.stateStore, this.sessionModel.getDatasourceName()).apply(stID.getId()).map(model -> Statement.builder().sessionId(this.sessionId).applicationId(model.getApplicationId()).jobId(model.getJobId()).statementId(model.getStatementId()).langType(model.getLangType()).query(model.getQuery()).queryId(model.getQueryId()).stateStore(this.stateStore).statementModel((StatementModel)model).build());
    }

    @Override
    public boolean isOperationalForDataSource(String dataSourceName) {
        boolean isSessionStateValid = this.sessionModel.getSessionState() != SessionState.DEAD && this.sessionModel.getSessionState() != SessionState.FAIL;
        boolean isDataSourceMatch = this.sessionId.getDataSourceName().equals(dataSourceName);
        boolean isSessionUpdatedRecently = this.timeProvider.currentEpochMillis() - this.sessionModel.getLastUpdateTime() <= this.sessionInactivityTimeoutMilli;
        return isSessionStateValid && isDataSourceMatch && isSessionUpdatedRecently;
    }

    @Generated
    InteractiveSession(SessionId sessionId, StateStore stateStore, EMRServerlessClient serverlessClient, SessionModel sessionModel, long sessionInactivityTimeoutMilli, TimeProvider timeProvider) {
        this.sessionId = sessionId;
        this.stateStore = stateStore;
        this.serverlessClient = serverlessClient;
        this.sessionModel = sessionModel;
        this.sessionInactivityTimeoutMilli = sessionInactivityTimeoutMilli;
        this.timeProvider = timeProvider;
    }

    @Generated
    public static InteractiveSessionBuilder builder() {
        return new InteractiveSessionBuilder();
    }

    @Override
    @Generated
    public SessionId getSessionId() {
        return this.sessionId;
    }

    @Generated
    public StateStore getStateStore() {
        return this.stateStore;
    }

    @Generated
    public EMRServerlessClient getServerlessClient() {
        return this.serverlessClient;
    }

    @Override
    @Generated
    public SessionModel getSessionModel() {
        return this.sessionModel;
    }

    @Generated
    public long getSessionInactivityTimeoutMilli() {
        return this.sessionInactivityTimeoutMilli;
    }

    @Generated
    public TimeProvider getTimeProvider() {
        return this.timeProvider;
    }

    @Generated
    public static class InteractiveSessionBuilder {
        @Generated
        private SessionId sessionId;
        @Generated
        private StateStore stateStore;
        @Generated
        private EMRServerlessClient serverlessClient;
        @Generated
        private SessionModel sessionModel;
        @Generated
        private long sessionInactivityTimeoutMilli;
        @Generated
        private TimeProvider timeProvider;

        @Generated
        InteractiveSessionBuilder() {
        }

        @Generated
        public InteractiveSessionBuilder sessionId(SessionId sessionId) {
            this.sessionId = sessionId;
            return this;
        }

        @Generated
        public InteractiveSessionBuilder stateStore(StateStore stateStore) {
            this.stateStore = stateStore;
            return this;
        }

        @Generated
        public InteractiveSessionBuilder serverlessClient(EMRServerlessClient serverlessClient) {
            this.serverlessClient = serverlessClient;
            return this;
        }

        @Generated
        public InteractiveSessionBuilder sessionModel(SessionModel sessionModel) {
            this.sessionModel = sessionModel;
            return this;
        }

        @Generated
        public InteractiveSessionBuilder sessionInactivityTimeoutMilli(long sessionInactivityTimeoutMilli) {
            this.sessionInactivityTimeoutMilli = sessionInactivityTimeoutMilli;
            return this;
        }

        @Generated
        public InteractiveSessionBuilder timeProvider(TimeProvider timeProvider) {
            this.timeProvider = timeProvider;
            return this;
        }

        @Generated
        public InteractiveSession build() {
            return new InteractiveSession(this.sessionId, this.stateStore, this.serverlessClient, this.sessionModel, this.sessionInactivityTimeoutMilli, this.timeProvider);
        }

        @Generated
        public String toString() {
            return "InteractiveSession.InteractiveSessionBuilder(sessionId=" + this.sessionId + ", stateStore=" + this.stateStore + ", serverlessClient=" + this.serverlessClient + ", sessionModel=" + this.sessionModel + ", sessionInactivityTimeoutMilli=" + this.sessionInactivityTimeoutMilli + ", timeProvider=" + this.timeProvider + ")";
        }
    }
}

