/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.sql.spark.execution.statestore;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Locale;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import lombok.Generated;
import org.apache.commons.io.IOUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.xcontent.DeprecationHandler;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.dispatcher.model.IndexDMLResult;
import org.opensearch.sql.spark.execution.session.SessionModel;
import org.opensearch.sql.spark.execution.session.SessionState;
import org.opensearch.sql.spark.execution.session.SessionType;
import org.opensearch.sql.spark.execution.statement.StatementModel;
import org.opensearch.sql.spark.execution.statement.StatementState;
import org.opensearch.sql.spark.execution.statestore.StateModel;
import org.opensearch.sql.spark.flint.FlintIndexState;
import org.opensearch.sql.spark.flint.FlintIndexStateModel;

public class StateStore {
    public static String SETTINGS_FILE_NAME = "query_execution_request_settings.yml";
    public static String MAPPING_FILE_NAME = "query_execution_request_mapping.yml";
    public static Function<String, String> DATASOURCE_TO_REQUEST_INDEX = datasourceName -> String.format("%s_%s", ".query_execution_request", datasourceName.toLowerCase(Locale.ROOT));
    public static String ALL_DATASOURCE = "*";
    private static final Logger LOG = LogManager.getLogger();
    private final Client client;
    private final ClusterService clusterService;

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @VisibleForTesting
    public <T extends StateModel> T create(T st, StateModel.CopyBuilder<T> builder, String indexName) {
        try {
            if (!this.clusterService.state().routingTable().hasIndex(indexName)) {
                this.createIndex(indexName);
            }
            IndexRequest indexRequest = (IndexRequest)new IndexRequest(indexName).id(st.getId()).source(st.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)).setIfSeqNo(st.getSeqNo()).setIfPrimaryTerm(st.getPrimaryTerm()).create(true).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
            try (ThreadContext.StoredContext ignored = this.client.threadPool().getThreadContext().stashContext();){
                IndexResponse indexResponse = (IndexResponse)this.client.index(indexRequest).actionGet();
                if (indexResponse.getResult().equals((Object)DocWriteResponse.Result.CREATED)) {
                    LOG.debug("Successfully created doc. id: {}", (Object)st.getId());
                    T stateModel = builder.of(st, indexResponse.getSeqNo(), indexResponse.getPrimaryTerm());
                    return stateModel;
                }
                throw new RuntimeException(String.format(Locale.ROOT, "Failed create doc. id: %s, error: %s", st.getId(), indexResponse.getResult().getLowercase()));
            }
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @VisibleForTesting
    public <T extends StateModel> Optional<T> get(String sid, StateModel.FromXContent<T> builder, String indexName) {
        try {
            if (!this.clusterService.state().routingTable().hasIndex(indexName)) {
                this.createIndex(indexName);
                return Optional.empty();
            }
            GetRequest getRequest = ((GetRequest)new GetRequest().index(indexName)).id(sid).refresh(true);
            try (ThreadContext.StoredContext ignored = this.client.threadPool().getThreadContext().stashContext();){
                GetResponse getResponse = (GetResponse)this.client.get(getRequest).actionGet();
                if (getResponse.isExists()) {
                    XContentParser parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, (DeprecationHandler)LoggingDeprecationHandler.INSTANCE, getResponse.getSourceAsString());
                    parser.nextToken();
                    Optional<T> optional2 = Optional.of(builder.fromXContent(parser, getResponse.getSeqNo(), getResponse.getPrimaryTerm()));
                    return optional2;
                }
                Optional optional = Optional.empty();
                return optional;
            }
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @VisibleForTesting
    public <T extends StateModel, S> T updateState(T st, S state, StateModel.StateCopyBuilder<T, S> builder, String indexName) {
        try {
            T model = builder.of(st, state, st.getSeqNo(), st.getPrimaryTerm());
            UpdateRequest updateRequest = ((UpdateRequest)new UpdateRequest().index(indexName)).id(((StateModel)model).getId()).setIfSeqNo(((StateModel)model).getSeqNo()).setIfPrimaryTerm(((StateModel)model).getPrimaryTerm()).doc(model.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)).fetchSource(true).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
            try (ThreadContext.StoredContext ignored = this.client.threadPool().getThreadContext().stashContext();){
                UpdateResponse updateResponse = (UpdateResponse)this.client.update(updateRequest).actionGet();
                LOG.debug("Successfully update doc. id: {}", (Object)st.getId());
                T t = builder.of(model, state, updateResponse.getSeqNo(), updateResponse.getPrimaryTerm());
                return t;
            }
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @VisibleForTesting
    public boolean delete(String sid, String indexName) {
        boolean bl;
        block9: {
            if (!this.clusterService.state().routingTable().hasIndex(indexName)) {
                return true;
            }
            ThreadContext.StoredContext ignored = this.client.threadPool().getThreadContext().stashContext();
            try {
                DeleteRequest deleteRequest = new DeleteRequest(indexName, sid);
                DeleteResponse deleteResponse = (DeleteResponse)this.client.delete(deleteRequest).actionGet();
                boolean bl2 = bl = deleteResponse.getResult() == DocWriteResponse.Result.DELETED;
                if (ignored == null) break block9;
            }
            catch (Throwable throwable) {
                try {
                    if (ignored != null) {
                        try {
                            ignored.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                catch (Exception e) {
                    throw new RuntimeException(String.format("Failed to delete index state doc %s in index %s", sid, indexName), e);
                }
            }
            ignored.close();
        }
        return bl;
    }

    private void createIndex(String indexName) {
        try {
            ActionFuture createIndexResponseActionFuture;
            CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
            createIndexRequest.mapping(this.loadConfigFromResource(MAPPING_FILE_NAME), XContentType.YAML).settings(this.loadConfigFromResource(SETTINGS_FILE_NAME), XContentType.YAML);
            try (ThreadContext.StoredContext ignored = this.client.threadPool().getThreadContext().stashContext();){
                createIndexResponseActionFuture = this.client.admin().indices().create(createIndexRequest);
            }
            CreateIndexResponse createIndexResponse = (CreateIndexResponse)createIndexResponseActionFuture.actionGet();
            if (!createIndexResponse.isAcknowledged()) {
                throw new RuntimeException("Index creation is not acknowledged.");
            }
            LOG.info("Index: {} creation Acknowledged", (Object)indexName);
        }
        catch (Throwable e) {
            throw new RuntimeException("Internal server error while creating" + indexName + " index:: " + e.getMessage());
        }
    }

    private long count(String indexName, QueryBuilder query) {
        ActionFuture searchResponseActionFuture;
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(query);
        searchSourceBuilder.size(0);
        SearchRequest searchRequest = new SearchRequest().indices(new String[]{indexName}).preference("_primary_first").source(searchSourceBuilder);
        try (ThreadContext.StoredContext ignored = this.client.threadPool().getThreadContext().stashContext();){
            searchResponseActionFuture = this.client.search(searchRequest);
        }
        SearchResponse searchResponse = (SearchResponse)searchResponseActionFuture.actionGet();
        if (searchResponse.status().getStatus() != 200) {
            throw new RuntimeException("Fetching job metadata information failed with status : " + searchResponse.status());
        }
        return searchResponse.getHits().getTotalHits().value;
    }

    private String loadConfigFromResource(String fileName) throws IOException {
        InputStream fileStream = StateStore.class.getClassLoader().getResourceAsStream(fileName);
        return IOUtils.toString((InputStream)fileStream, (Charset)StandardCharsets.UTF_8);
    }

    public static Function<StatementModel, StatementModel> createStatement(StateStore stateStore, String datasourceName) {
        return st -> stateStore.create((StateModel)st, StatementModel::copy, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName));
    }

    public static Function<String, Optional<StatementModel>> getStatement(StateStore stateStore, String datasourceName) {
        return docId -> stateStore.get((String)docId, StatementModel::fromXContent, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName));
    }

    public static BiFunction<StatementModel, StatementState, StatementModel> updateStatementState(StateStore stateStore, String datasourceName) {
        return (old, state) -> stateStore.updateState((StateModel)old, (Object)state, StatementModel::copyWithState, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName));
    }

    public static Function<SessionModel, SessionModel> createSession(StateStore stateStore, String datasourceName) {
        return session -> stateStore.create((StateModel)session, SessionModel::of, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName));
    }

    public static Function<String, Optional<SessionModel>> getSession(StateStore stateStore, String datasourceName) {
        return docId -> stateStore.get((String)docId, SessionModel::fromXContent, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName));
    }

    public static BiFunction<SessionModel, SessionState, SessionModel> updateSessionState(StateStore stateStore, String datasourceName) {
        return (old, state) -> stateStore.updateState((StateModel)old, (Object)state, SessionModel::copyWithState, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName));
    }

    public static Function<AsyncQueryJobMetadata, AsyncQueryJobMetadata> createJobMetaData(StateStore stateStore, String datasourceName) {
        return jobMetadata -> stateStore.create((StateModel)jobMetadata, AsyncQueryJobMetadata::copy, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName));
    }

    public static Function<String, Optional<AsyncQueryJobMetadata>> getJobMetaData(StateStore stateStore, String datasourceName) {
        return docId -> stateStore.get((String)docId, AsyncQueryJobMetadata::fromXContent, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName));
    }

    public static Supplier<Long> activeSessionsCount(StateStore stateStore, String datasourceName) {
        return () -> stateStore.count(DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName), (QueryBuilder)QueryBuilders.boolQuery().must((QueryBuilder)QueryBuilders.termQuery((String)"type", (String)"session")).must((QueryBuilder)QueryBuilders.termQuery((String)"sessionType", (String)SessionType.INTERACTIVE.getSessionType())).must((QueryBuilder)QueryBuilders.termQuery((String)"state", (String)SessionState.RUNNING.getSessionState())));
    }

    public static BiFunction<FlintIndexStateModel, FlintIndexState, FlintIndexStateModel> updateFlintIndexState(StateStore stateStore, String datasourceName) {
        return (old, state) -> stateStore.updateState((StateModel)old, (Object)state, FlintIndexStateModel::copyWithState, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName));
    }

    public static Function<String, Optional<FlintIndexStateModel>> getFlintIndexState(StateStore stateStore, String datasourceName) {
        return docId -> stateStore.get((String)docId, FlintIndexStateModel::fromXContent, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName));
    }

    public static Function<FlintIndexStateModel, FlintIndexStateModel> createFlintIndexState(StateStore stateStore, String datasourceName) {
        return st -> stateStore.create((StateModel)st, FlintIndexStateModel::copy, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName));
    }

    public static Function<String, Boolean> deleteFlintIndexState(StateStore stateStore, String datasourceName) {
        return docId -> stateStore.delete((String)docId, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName));
    }

    public static Function<IndexDMLResult, IndexDMLResult> createIndexDMLResult(StateStore stateStore, String indexName) {
        return result -> stateStore.create((StateModel)result, IndexDMLResult::copy, indexName);
    }

    public static Supplier<Long> activeRefreshJobCount(StateStore stateStore, String datasourceName) {
        return () -> stateStore.count(DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName), (QueryBuilder)QueryBuilders.boolQuery().must((QueryBuilder)QueryBuilders.termQuery((String)"type", (String)"flintindexstate")).must((QueryBuilder)QueryBuilders.termQuery((String)"state", (String)FlintIndexState.REFRESHING.getState())));
    }

    public static Supplier<Long> activeStatementsCount(StateStore stateStore, String datasourceName) {
        return () -> stateStore.count(DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName), (QueryBuilder)QueryBuilders.boolQuery().must((QueryBuilder)QueryBuilders.termQuery((String)"type", (String)"statement")).should((QueryBuilder)QueryBuilders.termsQuery((String)"state", (String[])new String[]{StatementState.RUNNING.getState(), StatementState.WAITING.getState()})));
    }

    @Generated
    public StateStore(Client client, ClusterService clusterService) {
        this.client = client;
        this.clusterService = clusterService;
    }
}

