/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.search.asynchronous.service;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchSecurityException;
import org.opensearch.ResourceAlreadyExistsException;
import org.opensearch.ResourceNotFoundException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionType;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.bulk.BackoffPolicy;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.io.stream.NotSerializableExceptionWrapper;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.commons.authuser.User;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.engine.DocumentMissingException;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.reindex.DeleteByQueryAction;
import org.opensearch.index.reindex.DeleteByQueryRequest;
import org.opensearch.rest.RestStatus;
import org.opensearch.script.Script;
import org.opensearch.script.ScriptType;
import org.opensearch.search.asynchronous.context.persistence.AsynchronousSearchPersistenceModel;
import org.opensearch.search.asynchronous.response.AcknowledgedResponse;
import org.opensearch.search.asynchronous.utils.UserAuthUtils;
import org.opensearch.search.fetch.subphase.FetchSourceContext;
import org.opensearch.threadpool.ThreadPool;

public class AsynchronousSearchPersistenceService {
    public static final String EXPIRATION_TIME_MILLIS = "expiration_time_millis";
    public static final String START_TIME_MILLIS = "start_time_millis";
    public static final String RESPONSE = "response";
    public static final String ERROR = "error";
    public static final String USER = "user";
    private static final Logger logger = LogManager.getLogger(AsynchronousSearchPersistenceService.class);
    public static final String ASYNC_SEARCH_RESPONSE_INDEX = ".opendistro-asynchronous-search-response";
    public static final BackoffPolicy STORE_BACKOFF_POLICY = BackoffPolicy.exponentialBackoff((TimeValue)TimeValue.timeValueMillis((long)250L), (int)14);
    public static final String BACKEND_ROLES = "backend_roles";
    public static final String SETTING_INDEX_CODEC = "index.codec";
    public static final String BEST_COMPRESSION_CODEC = "best_compression";
    private final Client client;
    private final ClusterService clusterService;
    private final ThreadPool threadPool;

    public AsynchronousSearchPersistenceService(Client client, ClusterService clusterService, ThreadPool threadPool) {
        this.client = client;
        this.clusterService = clusterService;
        this.threadPool = threadPool;
    }

    public void storeResponse(String id, AsynchronousSearchPersistenceModel persistenceModel, ActionListener<IndexResponse> listener) {
        if (this.indexExists()) {
            this.doStoreResult(id, persistenceModel, listener);
        } else {
            this.createIndexAndDoStoreResult(id, persistenceModel, listener);
        }
    }

    public void getResponse(String id, User user, ActionListener<AsynchronousSearchPersistenceModel> listener) {
        if (!this.indexExists()) {
            listener.onFailure((Exception)new ResourceNotFoundException(id, new Object[0]));
            return;
        }
        GetRequest request = new GetRequest(ASYNC_SEARCH_RESPONSE_INDEX, id);
        this.client.get(request, ActionListener.wrap(getResponse -> {
            if (getResponse.isExists()) {
                Map source = getResponse.getSource();
                AsynchronousSearchPersistenceModel asynchronousSearchPersistenceModel = new AsynchronousSearchPersistenceModel((long)((Long)source.get(START_TIME_MILLIS)), (long)((Long)source.get(EXPIRATION_TIME_MILLIS)), source.containsKey(RESPONSE) ? (String)source.get(RESPONSE) : null, source.containsKey(ERROR) ? (String)source.get(ERROR) : null, UserAuthUtils.parseUser((Map)source.get(USER)));
                if (UserAuthUtils.isUserValid(user, asynchronousSearchPersistenceModel.getUser())) {
                    listener.onResponse((Object)asynchronousSearchPersistenceModel);
                } else {
                    logger.debug("Invalid user requesting GET persisted context for asynchronous search [{}]", (Object)id);
                    listener.onFailure((Exception)new OpenSearchSecurityException("User doesn't have necessary roles to access the asynchronous search [" + id + "]", RestStatus.FORBIDDEN, new Object[0]));
                }
            } else {
                listener.onFailure((Exception)new ResourceNotFoundException(id, new Object[0]));
            }
        }, exception -> {
            logger.error(() -> new ParameterizedMessage("Failed to get response for asynchronous search [{}]", (Object)id), (Throwable)exception);
            Throwable cause = ExceptionsHelper.unwrapCause((Throwable)exception);
            listener.onFailure(cause instanceof Exception ? (Exception)cause : new NotSerializableExceptionWrapper(cause));
        }));
    }

    public void deleteResponse(String id, User user, ActionListener<Boolean> listener) {
        if (!this.indexExists()) {
            logger.debug("Async search index [{}] doesn't exists", (Object)ASYNC_SEARCH_RESPONSE_INDEX);
            listener.onFailure((Exception)new ResourceNotFoundException(id, new Object[0]));
            return;
        }
        Consumer<Exception> onFailure = e -> {
            Throwable cause = ExceptionsHelper.unwrapCause((Throwable)e);
            if (cause instanceof DocumentMissingException) {
                logger.debug(() -> new ParameterizedMessage("Async search response doc already deleted {}", (Object)id), (Throwable)e);
                listener.onFailure((Exception)new ResourceNotFoundException(id, new Object[0]));
            } else {
                logger.debug(() -> new ParameterizedMessage("Failed to delete asynchronous search for id {}", (Object)id), (Throwable)e);
                listener.onFailure(cause instanceof Exception ? (Exception)cause : new NotSerializableExceptionWrapper(cause));
            }
        };
        if (user == null) {
            this.client.delete(new DeleteRequest(ASYNC_SEARCH_RESPONSE_INDEX, id), ActionListener.wrap(deleteResponse -> {
                if (deleteResponse.getResult() == DocWriteResponse.Result.DELETED) {
                    logger.debug("Delete asynchronous search {} successful. Returned result {}", (Object)id, (Object)deleteResponse.getResult());
                    listener.onResponse((Object)true);
                } else {
                    logger.debug("Delete asynchronous search {} unsuccessful. Returned result {}", (Object)id, (Object)deleteResponse.getResult());
                    listener.onFailure((Exception)new ResourceNotFoundException(id, new Object[0]));
                }
            }, onFailure));
        } else {
            UpdateRequest updateRequest = new UpdateRequest(ASYNC_SEARCH_RESPONSE_INDEX, id);
            String scriptCode = "if (ctx._source.user == null || ctx._source.user.backend_roles == null || ( params.backend_roles!=null && params.backend_roles.containsAll(ctx._source.user.backend_roles))) { ctx.op = 'delete' } else { ctx.op = 'none' }";
            HashMap<String, List> params = new HashMap<String, List>();
            params.put(BACKEND_ROLES, user.getBackendRoles());
            Script deleteConditionallyScript = new Script(ScriptType.INLINE, "painless", scriptCode, params);
            updateRequest.script(deleteConditionallyScript);
            this.client.update(updateRequest, ActionListener.wrap(deleteResponse -> {
                switch (deleteResponse.getResult()) {
                    case UPDATED: {
                        listener.onFailure((Exception)new IllegalStateException("Document updated when requesting delete for asynchronous search id " + id));
                        break;
                    }
                    case NOOP: {
                        listener.onFailure((Exception)new OpenSearchSecurityException("User doesn't have necessary roles to access the asynchronous search with id " + id, RestStatus.FORBIDDEN, new Object[0]));
                        break;
                    }
                    case NOT_FOUND: {
                        listener.onFailure((Exception)new ResourceNotFoundException(id, new Object[0]));
                        break;
                    }
                    case DELETED: {
                        listener.onResponse((Object)true);
                    }
                }
            }, onFailure));
        }
    }

    public void updateExpirationTime(String id, long expirationTimeMillis, User user, ActionListener<AsynchronousSearchPersistenceModel> listener) {
        if (!this.indexExists()) {
            listener.onFailure((Exception)new ResourceNotFoundException(id, new Object[0]));
            return;
        }
        UpdateRequest updateRequest = new UpdateRequest(ASYNC_SEARCH_RESPONSE_INDEX, id);
        updateRequest.retryOnConflict(5);
        if (user == null) {
            HashMap<String, Long> source = new HashMap<String, Long>();
            source.put(EXPIRATION_TIME_MILLIS, expirationTimeMillis);
            updateRequest.doc(source, XContentType.JSON);
        } else {
            String scriptCode = "if (ctx._source.user == null || ctx._source.user.backend_roles == null || (params.backend_roles != null && params.backend_roles.containsAll(ctx._source.user.backend_roles))) { ctx._source.expiration_time_millis = params.expiration_time_millis } else { ctx.op = 'none' }";
            HashMap<String, Object> params = new HashMap<String, Object>();
            params.put(BACKEND_ROLES, user.getBackendRoles());
            params.put(EXPIRATION_TIME_MILLIS, expirationTimeMillis);
            Script conditionalUpdateScript = new Script(ScriptType.INLINE, "painless", scriptCode, params);
            updateRequest.script(conditionalUpdateScript);
        }
        updateRequest.fetchSource(FetchSourceContext.FETCH_SOURCE);
        this.client.update(updateRequest, ActionListener.wrap(updateResponse -> {
            switch (updateResponse.getResult()) {
                case NOOP: {
                    if (user != null) {
                        listener.onFailure((Exception)new OpenSearchSecurityException("User doesn't have necessary roles to access the asynchronous search with id " + id, RestStatus.FORBIDDEN, new Object[0]));
                        break;
                    }
                    Map updatedSource = updateResponse.getGetResult().getSource();
                    listener.onResponse((Object)new AsynchronousSearchPersistenceModel((long)((Long)updatedSource.get(START_TIME_MILLIS)), (long)((Long)updatedSource.get(EXPIRATION_TIME_MILLIS)), (String)updatedSource.get(RESPONSE), (String)updatedSource.get(ERROR), UserAuthUtils.parseUser((Map)updatedSource.get(USER))));
                    break;
                }
                case UPDATED: {
                    Map updatedSource = updateResponse.getGetResult().getSource();
                    listener.onResponse((Object)new AsynchronousSearchPersistenceModel((long)((Long)updatedSource.get(START_TIME_MILLIS)), (long)((Long)updatedSource.get(EXPIRATION_TIME_MILLIS)), (String)updatedSource.get(RESPONSE), (String)updatedSource.get(ERROR), UserAuthUtils.parseUser((Map)updatedSource.get(USER))));
                    break;
                }
                case NOT_FOUND: 
                case DELETED: {
                    logger.debug("Update Result [{}] for id [{}], expiration time requested, [{}]", (Object)updateResponse.getResult(), (Object)id, (Object)expirationTimeMillis);
                    listener.onFailure((Exception)new ResourceNotFoundException(id, new Object[0]));
                }
            }
        }, exception -> {
            Throwable cause = ExceptionsHelper.unwrapCause((Throwable)exception);
            if (cause instanceof DocumentMissingException) {
                listener.onFailure((Exception)new ResourceNotFoundException(id, new Object[0]));
            } else {
                logger.error(() -> new ParameterizedMessage("Exception occurred updating expiration time for asynchronous search [{}]", (Object)id), (Throwable)exception);
                listener.onFailure(cause instanceof Exception ? (Exception)cause : new NotSerializableExceptionWrapper(cause));
            }
        }));
    }

    public void deleteExpiredResponses(ActionListener<AcknowledgedResponse> listener, long expirationTimeInMillis) {
        if (!this.indexExists()) {
            logger.debug("Async search index not yet created! Nothing to delete.");
            listener.onResponse((Object)new AcknowledgedResponse(true));
        } else {
            DeleteByQueryRequest request = new DeleteByQueryRequest(new String[]{ASYNC_SEARCH_RESPONSE_INDEX}).setQuery((QueryBuilder)QueryBuilders.rangeQuery((String)EXPIRATION_TIME_MILLIS).lte((Object)expirationTimeInMillis));
            this.client.execute((ActionType)DeleteByQueryAction.INSTANCE, (ActionRequest)request, ActionListener.wrap(deleteResponse -> {
                if (deleteResponse.getBulkFailures() != null && deleteResponse.getBulkFailures().size() > 0 || deleteResponse.getSearchFailures() != null && deleteResponse.getSearchFailures().size() > 0) {
                    logger.error("Failed to delete expired asynchronous search responses with bulk failures[{}] / search failures [{}]", (Object)deleteResponse.getBulkFailures(), (Object)deleteResponse.getSearchFailures());
                    listener.onResponse((Object)new AcknowledgedResponse(false));
                } else {
                    logger.debug("Successfully deleted expired responses");
                    listener.onResponse((Object)new AcknowledgedResponse(true));
                }
            }, e -> {
                logger.error(() -> new ParameterizedMessage("Failed to delete expired response for expiration time {}", (Object)expirationTimeInMillis), (Throwable)e);
                Throwable cause = ExceptionsHelper.unwrapCause((Throwable)e);
                listener.onFailure(cause instanceof Exception ? (Exception)cause : new NotSerializableExceptionWrapper(cause));
            }));
        }
    }

    private void createIndexAndDoStoreResult(String id, AsynchronousSearchPersistenceModel persistenceModel, ActionListener<IndexResponse> listener) {
        this.client.admin().indices().prepareCreate(ASYNC_SEARCH_RESPONSE_INDEX).setMapping(this.mapping()).setSettings(this.indexSettings()).execute(ActionListener.wrap(createIndexResponse -> this.doStoreResult(id, persistenceModel, listener), exception -> {
            if (ExceptionsHelper.unwrapCause((Throwable)exception) instanceof ResourceAlreadyExistsException) {
                try {
                    this.doStoreResult(id, persistenceModel, listener);
                }
                catch (Exception inner) {
                    inner.addSuppressed((Throwable)exception);
                    listener.onFailure(inner);
                }
            } else {
                listener.onFailure(exception);
            }
        }));
    }

    private void doStoreResult(String id, AsynchronousSearchPersistenceModel model, ActionListener<IndexResponse> listener) {
        HashMap<String, Object> source = new HashMap<String, Object>();
        source.put(RESPONSE, model.getResponse());
        source.put(ERROR, model.getError());
        source.put(EXPIRATION_TIME_MILLIS, model.getExpirationTimeMillis());
        source.put(START_TIME_MILLIS, model.getStartTimeMillis());
        source.put(USER, model.getUser());
        IndexRequestBuilder indexRequestBuilder = this.client.prepareIndex(ASYNC_SEARCH_RESPONSE_INDEX).setId(id).setSource(source, XContentType.JSON);
        this.doStoreResult(STORE_BACKOFF_POLICY.iterator(), indexRequestBuilder, listener);
    }

    private void doStoreResult(final Iterator<TimeValue> backoff, final IndexRequestBuilder indexRequestBuilder, final ActionListener<IndexResponse> listener) {
        indexRequestBuilder.execute((ActionListener)new ActionListener<IndexResponse>(){

            public void onResponse(IndexResponse indexResponse) {
                listener.onResponse((Object)indexResponse);
            }

            public void onFailure(Exception e) {
                Throwable cause = ExceptionsHelper.unwrapCause((Throwable)e);
                if (cause instanceof OpenSearchRejectedExecutionException && backoff.hasNext()) {
                    TimeValue wait = (TimeValue)backoff.next();
                    logger.warn(() -> new ParameterizedMessage("failed to store asynchronous search response [{}], retrying in [{}]", (Object)((IndexRequest)indexRequestBuilder.request()).id(), (Object)wait), (Throwable)e);
                    AsynchronousSearchPersistenceService.this.threadPool.schedule(() -> AsynchronousSearchPersistenceService.this.doStoreResult(backoff, indexRequestBuilder, (ActionListener<IndexResponse>)listener), wait, "same");
                } else {
                    logger.error(() -> new ParameterizedMessage("failed to store asynchronous search response [{}], not retrying", (Object)((IndexRequest)indexRequestBuilder.request()).id()), (Throwable)e);
                    listener.onFailure(e);
                }
            }
        });
    }

    private Settings indexSettings() {
        return Settings.builder().put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 5).put(IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING.getKey(), "0-1").put("index.priority", Integer.MAX_VALUE).put("index.hidden", true).put(SETTING_INDEX_CODEC, BEST_COMPRESSION_CODEC).build();
    }

    private XContentBuilder mapping() {
        try {
            XContentBuilder builder = XContentFactory.contentBuilder((XContentType)XContentType.JSON);
            builder.startObject().startObject("properties").startObject(START_TIME_MILLIS).field("type", "date").field("format", "epoch_millis").endObject().startObject(EXPIRATION_TIME_MILLIS).field("type", "date").field("format", "epoch_millis").endObject().startObject(RESPONSE).field("type", "binary").endObject().startObject(ERROR).field("type", "binary").endObject().endObject().endObject();
            return builder;
        }
        catch (IOException e) {
            throw new IllegalArgumentException("Async search persistence mapping cannot be read correctly.", e);
        }
    }

    private boolean indexExists() {
        return this.clusterService.state().routingTable().hasIndex(ASYNC_SEARCH_RESPONSE_INDEX);
    }
}

