/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.index.reindex.remote;

import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import org.apache.http.ContentTooLongException;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.util.EntityUtils;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.Version;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.ResponseListener;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.ParseFieldMatcherSupplier;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.reindex.ScrollableHitSource;
import org.elasticsearch.index.reindex.remote.RemoteRequestBuilders;
import org.elasticsearch.index.reindex.remote.RemoteResponseParsers;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;

public class RemoteScrollableHitSource
extends ScrollableHitSource {
    private final RestClient client;
    private final BytesReference query;
    private final SearchRequest searchRequest;
    Version remoteVersion;

    public RemoteScrollableHitSource(Logger logger, BackoffPolicy backoffPolicy, ThreadPool threadPool, Runnable countSearchRetry, Consumer<Exception> fail, RestClient client, BytesReference query, SearchRequest searchRequest) {
        super(logger, backoffPolicy, threadPool, countSearchRetry, fail);
        this.query = query;
        this.searchRequest = searchRequest;
        this.client = client;
    }

    @Override
    public void close() {
        try {
            this.client.close();
        }
        catch (IOException e) {
            this.fail.accept(new IOException("couldn't close the remote connection", e));
        }
    }

    @Override
    protected void doStart(Consumer<? super ScrollableHitSource.Response> onResponse) {
        this.lookupRemoteVersion(version -> {
            this.remoteVersion = version;
            this.execute("POST", RemoteRequestBuilders.initialSearchPath(this.searchRequest), RemoteRequestBuilders.initialSearchParams(this.searchRequest, version), RemoteRequestBuilders.initialSearchEntity(this.query), (BiFunction)RemoteResponseParsers.RESPONSE_PARSER, (Consumer)r -> this.onStartResponse(onResponse, (ScrollableHitSource.Response)r));
        });
    }

    void lookupRemoteVersion(Consumer<Version> onVersion) {
        this.execute("GET", "", Collections.emptyMap(), null, (BiFunction)RemoteResponseParsers.MAIN_ACTION_PARSER, (Consumer)onVersion);
    }

    private void onStartResponse(Consumer<? super ScrollableHitSource.Response> onResponse, ScrollableHitSource.Response response) {
        if (Strings.hasLength((String)response.getScrollId()) && response.getHits().isEmpty()) {
            this.logger.debug("First response looks like a scan response. Jumping right to the second. scroll=[{}]", (Object)response.getScrollId());
            this.doStartNextScroll(response.getScrollId(), TimeValue.timeValueMillis((long)0L), onResponse);
        } else {
            onResponse.accept(response);
        }
    }

    @Override
    protected void doStartNextScroll(String scrollId, TimeValue extraKeepAlive, Consumer<? super ScrollableHitSource.Response> onResponse) {
        this.execute("POST", RemoteRequestBuilders.scrollPath(), RemoteRequestBuilders.scrollParams(TimeValue.timeValueNanos((long)(this.searchRequest.scroll().keepAlive().nanos() + extraKeepAlive.nanos()))), RemoteRequestBuilders.scrollEntity(scrollId), (BiFunction)RemoteResponseParsers.RESPONSE_PARSER, (Consumer)onResponse);
    }

    @Override
    protected void clearScroll(final String scrollId) {
        this.client.performRequestAsync("DELETE", RemoteRequestBuilders.scrollPath(), Collections.emptyMap(), RemoteRequestBuilders.scrollEntity(scrollId), new ResponseListener(){

            public void onSuccess(Response response) {
                RemoteScrollableHitSource.this.logger.debug("Successfully cleared [{}]", (Object)scrollId);
            }

            public void onFailure(Exception t) {
                RemoteScrollableHitSource.this.logger.warn(() -> new ParameterizedMessage("Failed to clear scroll [{}]", (Object)scrollId), (Throwable)t);
            }
        }, new Header[0]);
    }

    private <T> void execute(final String method, final String uri, final Map<String, String> params, final HttpEntity entity, final BiFunction<XContentParser, ParseFieldMatcherSupplier, T> parser, final Consumer<? super T> listener) {
        final ThreadContext.StoredContext ctx = this.threadPool.getThreadContext().newStoredContext();
        class RetryHelper
        extends AbstractRunnable {
            private final Iterator<TimeValue> retries;

            RetryHelper() {
                this.retries = RemoteScrollableHitSource.this.backoffPolicy.iterator();
            }

            protected void doRun() throws Exception {
                RemoteScrollableHitSource.this.client.performRequestAsync(method, uri, params, entity, new ResponseListener(){

                    public void onSuccess(Response response) {
                        Object parsedResponse;
                        ctx.restore();
                        try {
                            HttpEntity responseEntity = response.getEntity();
                            InputStream content = responseEntity.getContent();
                            XContentType xContentType = null;
                            if (responseEntity.getContentType() != null) {
                                xContentType = XContentType.fromMediaTypeOrFormat((String)responseEntity.getContentType().getValue());
                            }
                            if (xContentType == null) {
                                xContentType = XContentFactory.xContentType((InputStream)content);
                            }
                            try (XContentParser xContentParser = xContentType.xContent().createParser(content);){
                                parsedResponse = parser.apply(xContentParser, () -> ParseFieldMatcher.STRICT);
                            }
                        }
                        catch (IOException e) {
                            throw new ElasticsearchException("Error deserializing response", (Throwable)e, new Object[0]);
                        }
                        listener.accept(parsedResponse);
                    }

                    public void onFailure(Exception e) {
                        if (e instanceof ResponseException) {
                            ResponseException re = (ResponseException)e;
                            if (RestStatus.TOO_MANY_REQUESTS.getStatus() == re.getResponse().getStatusLine().getStatusCode() && retries.hasNext()) {
                                TimeValue delay = (TimeValue)retries.next();
                                RemoteScrollableHitSource.this.logger.trace(() -> new ParameterizedMessage("retrying rejected search after [{}]", (Object)delay), (Throwable)e);
                                RemoteScrollableHitSource.this.countSearchRetry.run();
                                RemoteScrollableHitSource.this.threadPool.schedule(delay, "same", (Runnable)((Object)this));
                                return;
                            }
                            e = RemoteScrollableHitSource.wrapExceptionToPreserveStatus(re.getResponse().getStatusLine().getStatusCode(), re.getResponse().getEntity(), (Exception)re);
                        } else if (e instanceof ContentTooLongException) {
                            e = new IllegalArgumentException("Remote responded with a chunk that was too large. Use a smaller batch size.", e);
                        }
                        RemoteScrollableHitSource.this.fail.accept(e);
                    }
                }, new Header[0]);
            }

            public void onFailure(Exception t) {
                RemoteScrollableHitSource.this.fail.accept(t);
            }
        }
        new RetryHelper().run();
    }

    static ElasticsearchStatusException wrapExceptionToPreserveStatus(int statusCode, @Nullable HttpEntity entity, Exception cause) {
        String message;
        RestStatus status = RestStatus.fromCode((int)statusCode);
        String messagePrefix = "";
        if (status == null) {
            messagePrefix = "Couldn't extract status [" + statusCode + "]. ";
            status = RestStatus.INTERNAL_SERVER_ERROR;
        }
        if (entity == null) {
            message = messagePrefix + "No error body.";
        } else {
            try {
                message = messagePrefix + "body=" + EntityUtils.toString((HttpEntity)entity);
            }
            catch (IOException ioe) {
                ElasticsearchStatusException e = new ElasticsearchStatusException(messagePrefix + "Failed to extract body.", status, (Throwable)cause, new Object[0]);
                e.addSuppressed((Throwable)ioe);
                return e;
            }
        }
        return new ElasticsearchStatusException(message, status, (Throwable)cause, new Object[0]);
    }
}

