/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.search.backpressure;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.search.SearchShardTask;
import org.opensearch.common.component.AbstractLifecycleComponent;
import org.opensearch.common.util.TokenBucket;
import org.opensearch.monitor.jvm.JvmStats;
import org.opensearch.monitor.process.ProcessProbe;
import org.opensearch.search.backpressure.SearchBackpressureState;
import org.opensearch.search.backpressure.settings.SearchBackpressureMode;
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
import org.opensearch.search.backpressure.stats.SearchBackpressureStats;
import org.opensearch.search.backpressure.stats.SearchShardTaskStats;
import org.opensearch.search.backpressure.trackers.CpuUsageTracker;
import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker;
import org.opensearch.search.backpressure.trackers.HeapUsageTracker;
import org.opensearch.search.backpressure.trackers.NodeDuressTracker;
import org.opensearch.search.backpressure.trackers.TaskResourceUsageTracker;
import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType;
import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskCancellation;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;

public class SearchBackpressureService
extends AbstractLifecycleComponent
implements TaskResourceTrackingService.TaskCompletionListener,
SearchBackpressureSettings.Listener {
    private static final Logger logger = LogManager.getLogger(SearchBackpressureService.class);
    private volatile Scheduler.Cancellable scheduledFuture;
    private final SearchBackpressureSettings settings;
    private final TaskResourceTrackingService taskResourceTrackingService;
    private final ThreadPool threadPool;
    private final LongSupplier timeNanosSupplier;
    private final List<NodeDuressTracker> nodeDuressTrackers;
    private final List<TaskResourceUsageTracker> taskResourceUsageTrackers;
    private final AtomicReference<TokenBucket> taskCancellationRateLimiter = new AtomicReference();
    private final AtomicReference<TokenBucket> taskCancellationRatioLimiter = new AtomicReference();
    private final SearchBackpressureState state = new SearchBackpressureState();

    public SearchBackpressureService(SearchBackpressureSettings settings, TaskResourceTrackingService taskResourceTrackingService, ThreadPool threadPool) {
        this(settings, taskResourceTrackingService, threadPool, System::nanoTime, List.of(new NodeDuressTracker(() -> (double)ProcessProbe.getInstance().getProcessCpuPercent() / 100.0 >= settings.getNodeDuressSettings().getCpuThreshold()), new NodeDuressTracker(() -> (double)JvmStats.jvmStats().getMem().getHeapUsedPercent() / 100.0 >= settings.getNodeDuressSettings().getHeapThreshold())), List.of(new CpuUsageTracker(settings), new HeapUsageTracker(settings), new ElapsedTimeTracker(settings, System::nanoTime)));
    }

    public SearchBackpressureService(SearchBackpressureSettings settings, TaskResourceTrackingService taskResourceTrackingService, ThreadPool threadPool, LongSupplier timeNanosSupplier, List<NodeDuressTracker> nodeDuressTrackers, List<TaskResourceUsageTracker> taskResourceUsageTrackers) {
        this.settings = settings;
        this.settings.addListener(this);
        this.taskResourceTrackingService = taskResourceTrackingService;
        this.taskResourceTrackingService.addTaskCompletionListener(this);
        this.threadPool = threadPool;
        this.timeNanosSupplier = timeNanosSupplier;
        this.nodeDuressTrackers = nodeDuressTrackers;
        this.taskResourceUsageTrackers = taskResourceUsageTrackers;
        this.taskCancellationRateLimiter.set(new TokenBucket(timeNanosSupplier, this.getSettings().getCancellationRateNanos(), this.getSettings().getCancellationBurst()));
        this.taskCancellationRatioLimiter.set(new TokenBucket(this.state::getCompletionCount, this.getSettings().getCancellationRatio(), this.getSettings().getCancellationBurst()));
    }

    void doRun() {
        SearchBackpressureMode mode = this.getSettings().getMode();
        if (mode == SearchBackpressureMode.DISABLED) {
            return;
        }
        if (!this.isNodeInDuress()) {
            return;
        }
        List<SearchShardTask> searchShardTasks = this.getSearchShardTasks();
        this.taskResourceTrackingService.refreshResourceStats(searchShardTasks.toArray(new Task[0]));
        if (!this.isHeapUsageDominatedBySearch(searchShardTasks)) {
            return;
        }
        for (TaskCancellation taskCancellation : this.getTaskCancellations(searchShardTasks)) {
            boolean ratioLimitReached;
            logger.debug("[{} mode] cancelling task [{}] due to high resource consumption [{}]", (Object)mode.getName(), (Object)taskCancellation.getTask().getId(), (Object)taskCancellation.getReasonString());
            if (mode != SearchBackpressureMode.ENFORCED) continue;
            boolean rateLimitReached = !this.taskCancellationRateLimiter.get().request();
            boolean bl = ratioLimitReached = !this.taskCancellationRatioLimiter.get().request();
            if (rateLimitReached && ratioLimitReached) {
                logger.debug("task cancellation limit reached");
                this.state.incrementLimitReachedCount();
                break;
            }
            taskCancellation.cancel();
        }
    }

    boolean isNodeInDuress() {
        boolean isNodeInDuress = false;
        int numSuccessiveBreaches = this.getSettings().getNodeDuressSettings().getNumSuccessiveBreaches();
        for (NodeDuressTracker tracker : this.nodeDuressTrackers) {
            if (tracker.check() < numSuccessiveBreaches) continue;
            isNodeInDuress = true;
        }
        return isNodeInDuress;
    }

    boolean isHeapUsageDominatedBySearch(List<SearchShardTask> searchShardTasks) {
        long threshold;
        long usage = searchShardTasks.stream().mapToLong(task -> task.getTotalResourceStats().getMemoryInBytes()).sum();
        if (usage < (threshold = this.getSettings().getSearchShardTaskSettings().getTotalHeapBytesThreshold())) {
            logger.debug("heap usage not dominated by search requests [{}/{}]", (Object)usage, (Object)threshold);
            return false;
        }
        return true;
    }

    List<SearchShardTask> getSearchShardTasks() {
        return this.taskResourceTrackingService.getResourceAwareTasks().values().stream().filter(task -> task instanceof SearchShardTask).map(task -> (SearchShardTask)task).collect(Collectors.toUnmodifiableList());
    }

    TaskCancellation getTaskCancellation(CancellableTask task) {
        ArrayList<TaskCancellation.Reason> reasons = new ArrayList<TaskCancellation.Reason>();
        ArrayList<Runnable> callbacks = new ArrayList<Runnable>();
        for (TaskResourceUsageTracker tracker : this.taskResourceUsageTrackers) {
            Optional<TaskCancellation.Reason> reason = tracker.checkAndMaybeGetCancellationReason(task);
            if (!reason.isPresent()) continue;
            reasons.add(reason.get());
            callbacks.add(tracker::incrementCancellations);
        }
        if (task instanceof SearchShardTask) {
            callbacks.add(this.state::incrementCancellationCount);
        }
        return new TaskCancellation(task, reasons, callbacks);
    }

    List<TaskCancellation> getTaskCancellations(List<? extends CancellableTask> tasks) {
        return tasks.stream().map(this::getTaskCancellation).filter(TaskCancellation::isEligibleForCancellation).sorted(Comparator.reverseOrder()).collect(Collectors.toUnmodifiableList());
    }

    SearchBackpressureSettings getSettings() {
        return this.settings;
    }

    SearchBackpressureState getState() {
        return this.state;
    }

    @Override
    public void onTaskCompleted(Task task) {
        if (this.getSettings().getMode() == SearchBackpressureMode.DISABLED) {
            return;
        }
        if (!(task instanceof SearchShardTask)) {
            return;
        }
        SearchShardTask searchShardTask = (SearchShardTask)task;
        if (!searchShardTask.isCancelled()) {
            this.state.incrementCompletionCount();
        }
        ArrayList<Exception> exceptions = new ArrayList<Exception>();
        for (TaskResourceUsageTracker tracker : this.taskResourceUsageTrackers) {
            try {
                tracker.update(searchShardTask);
            }
            catch (Exception e) {
                exceptions.add(e);
            }
        }
        ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptions);
    }

    @Override
    public void onCancellationRatioChanged() {
        this.taskCancellationRatioLimiter.set(new TokenBucket(this.state::getCompletionCount, this.getSettings().getCancellationRatio(), this.getSettings().getCancellationBurst()));
    }

    @Override
    public void onCancellationRateChanged() {
        this.taskCancellationRateLimiter.set(new TokenBucket(this.timeNanosSupplier, this.getSettings().getCancellationRateNanos(), this.getSettings().getCancellationBurst()));
    }

    @Override
    public void onCancellationBurstChanged() {
        this.onCancellationRatioChanged();
        this.onCancellationRateChanged();
    }

    @Override
    protected void doStart() {
        this.scheduledFuture = this.threadPool.scheduleWithFixedDelay(() -> {
            try {
                this.doRun();
            }
            catch (Exception e) {
                logger.debug("failure in search search backpressure", (Throwable)e);
            }
        }, this.getSettings().getInterval(), "generic");
    }

    @Override
    protected void doStop() {
        if (this.scheduledFuture != null) {
            this.scheduledFuture.cancel();
        }
    }

    @Override
    protected void doClose() throws IOException {
    }

    public SearchBackpressureStats nodeStats() {
        List<SearchShardTask> searchShardTasks = this.getSearchShardTasks();
        SearchShardTaskStats searchShardTaskStats = new SearchShardTaskStats(this.state.getCancellationCount(), this.state.getLimitReachedCount(), this.taskResourceUsageTrackers.stream().collect(Collectors.toUnmodifiableMap(t -> TaskResourceUsageTrackerType.fromName(t.name()), t -> t.stats(searchShardTasks))));
        return new SearchBackpressureStats(searchShardTaskStats, this.getSettings().getMode());
    }
}

