/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.ad.util;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.action.ActionFuture;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionResponse;
import org.opensearch.action.ActionType;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.action.admin.cluster.node.tasks.cancel.CancelTasksAction;
import org.opensearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.opensearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksAction;
import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.opensearch.ad.common.exception.InternalFailure;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.util.Throttler;
import org.opensearch.client.Client;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.tasks.TaskId;
import org.opensearch.tasks.TaskInfo;
import org.opensearch.threadpool.ThreadPool;

public class ClientUtil {
    private volatile TimeValue requestTimeout;
    private Client client;
    private final Throttler throttler;
    private ThreadPool threadPool;

    @Inject
    public ClientUtil(Settings setting, Client client, Throttler throttler, ThreadPool threadPool) {
        this.requestTimeout = (TimeValue)AnomalyDetectorSettings.REQUEST_TIMEOUT.get(setting);
        this.client = client;
        this.throttler = throttler;
        this.threadPool = threadPool;
    }

    public <Request extends ActionRequest, Response extends ActionResponse> Optional<Response> timedRequest(Request request, Logger LOG, BiConsumer<Request, ActionListener<Response>> consumer) {
        try {
            AtomicReference respReference = new AtomicReference();
            CountDownLatch latch = new CountDownLatch(1);
            consumer.accept(request, (ActionListener<Response>)new LatchedActionListener(ActionListener.wrap(response -> respReference.set(response), exception -> LOG.error("Cannot get response for request {}, error: {}", (Object)request, exception)), latch));
            if (!latch.await(this.requestTimeout.getSeconds(), TimeUnit.SECONDS)) {
                throw new OpenSearchTimeoutException("Cannot get response within time limit: " + request.toString(), new Object[0]);
            }
            return Optional.ofNullable((ActionResponse)respReference.get());
        }
        catch (InterruptedException e1) {
            LOG.error("Exception in waiting for result");
            throw new IllegalStateException(e1);
        }
    }

    public <Request extends ActionRequest, Response extends ActionResponse> void asyncRequest(Request request, BiConsumer<Request, ActionListener<Response>> consumer, ActionListener<Response> listener) {
        consumer.accept(request, ActionListener.wrap(response -> listener.onResponse(response), exception -> listener.onFailure(exception)));
    }

    public <Request extends ActionRequest, Response extends ActionResponse> void execute(ActionType<Response> action, Request request, ActionListener<Response> listener) {
        this.client.execute(action, request, ActionListener.wrap(response -> listener.onResponse(response), exception -> listener.onFailure(exception)));
    }

    @Deprecated
    public <Request extends ActionRequest, Response extends ActionResponse> Response syncRequest(Request request, Function<Request, ActionFuture<Response>> function) {
        return (Response)((ActionResponse)function.apply(request).actionGet(this.requestTimeout));
    }

    public <Request extends ActionRequest, Response extends ActionResponse> Optional<Response> throttledTimedRequest(Request request, Logger LOG, BiConsumer<Request, ActionListener<Response>> consumer, AnomalyDetector detector) {
        try {
            String detectorId = detector.getDetectorId();
            if (!this.throttler.insertFilteredQuery(detectorId, request)) {
                LOG.info("There is one query running for detectorId: {}. Trying to cancel the long running query", (Object)detectorId);
                this.cancelRunningQuery(this.client, detectorId, LOG);
                throw new InternalFailure(detector.getDetectorId(), "There is already a query running on AnomalyDetector");
            }
            AtomicReference respReference = new AtomicReference();
            CountDownLatch latch = new CountDownLatch(1);
            try (ThreadContext.StoredContext context = this.threadPool.getThreadContext().stashContext();){
                assert (context != null);
                this.threadPool.getThreadContext().putHeader("X-Opaque-Id", "[Anomaly Detector]:" + detectorId);
                consumer.accept(request, (ActionListener<Response>)new LatchedActionListener(ActionListener.wrap(response -> {
                    this.throttler.clearFilteredQuery(detectorId);
                    respReference.set(response);
                }, exception -> {
                    this.throttler.clearFilteredQuery(detectorId);
                    LOG.error("Cannot get response for request {}, error: {}", (Object)request, exception);
                }), latch));
            }
            catch (Exception e) {
                LOG.error("Failed to process the request for detectorId: {}.", (Object)detectorId);
                this.throttler.clearFilteredQuery(detectorId);
                throw e;
            }
            if (!latch.await(this.requestTimeout.getSeconds(), TimeUnit.SECONDS)) {
                throw new OpenSearchTimeoutException("Cannot get response within time limit: " + request.toString(), new Object[0]);
            }
            return Optional.ofNullable((ActionResponse)respReference.get());
        }
        catch (InterruptedException e1) {
            LOG.error("Exception in waiting for result");
            throw new IllegalStateException(e1);
        }
    }

    public boolean hasRunningQuery(AnomalyDetector detector) {
        return this.throttler.getFilteredQuery(detector.getDetectorId()).isPresent();
    }

    private void cancelRunningQuery(Client client, String detectorId, Logger LOG) {
        ListTasksRequest listTasksRequest = new ListTasksRequest();
        listTasksRequest.setActions(new String[]{"*search*"});
        client.execute((ActionType)ListTasksAction.INSTANCE, (ActionRequest)listTasksRequest, ActionListener.wrap(response -> this.onListTaskResponse((ListTasksResponse)response, detectorId, LOG), exception -> {
            LOG.error("List Tasks failed.", (Throwable)exception);
            throw new InternalFailure(detectorId, "Failed to list current tasks", (Throwable)exception);
        }));
    }

    private void onListTaskResponse(ListTasksResponse listTasksResponse, String detectorId, Logger LOG) {
        List tasks = listTasksResponse.getTasks();
        TaskId matchedParentTaskId = null;
        TaskId matchedSingleTaskId = null;
        for (TaskInfo task : tasks) {
            if (task.getHeaders().isEmpty() || !((String)task.getHeaders().get("X-Opaque-Id")).equals("[Anomaly Detector]:" + detectorId)) continue;
            if (!task.getParentTaskId().equals((Object)TaskId.EMPTY_TASK_ID)) {
                matchedParentTaskId = task.getParentTaskId();
                break;
            }
            matchedSingleTaskId = task.getTaskId();
        }
        if (matchedParentTaskId == null && matchedSingleTaskId == null) {
            LOG.info("Couldn't find task for detectorId: {}. Clean this entry from Throttler", (Object)detectorId);
            this.throttler.clearFilteredQuery(detectorId);
            return;
        }
        CancelTasksRequest cancelTaskRequest = new CancelTasksRequest();
        if (matchedParentTaskId != null) {
            cancelTaskRequest.setParentTaskId(matchedParentTaskId);
            LOG.info("Start to cancel task for parentTaskId: {}", (Object)matchedParentTaskId.toString());
        } else {
            cancelTaskRequest.setTaskId(matchedSingleTaskId);
            LOG.info("Start to cancel task for taskId: {}", (Object)matchedSingleTaskId.toString());
        }
        this.client.execute((ActionType)CancelTasksAction.INSTANCE, (ActionRequest)cancelTaskRequest, ActionListener.wrap(response -> this.onCancelTaskResponse((CancelTasksResponse)response, detectorId, LOG), exception -> {
            LOG.error("Failed to cancel task for detectorId: " + detectorId, (Throwable)exception);
            throw new InternalFailure(detectorId, "Failed to cancel current tasks", (Throwable)exception);
        }));
    }

    private void onCancelTaskResponse(CancelTasksResponse cancelTasksResponse, String detectorId, Logger LOG) {
        List nodeFailures = cancelTasksResponse.getNodeFailures();
        List taskFailures = cancelTasksResponse.getTaskFailures();
        if (nodeFailures.isEmpty() && taskFailures.isEmpty()) {
            LOG.info("Cancelling query for detectorId: {} succeeds. Clear entry from Throttler", (Object)detectorId);
            this.throttler.clearFilteredQuery(detectorId);
            return;
        }
        LOG.error("Failed to cancel task for detectorId: " + detectorId);
        throw new InternalFailure(detectorId, "Failed to cancel current tasks due to node or task failures");
    }
}

