/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.implementation.changefeed.epkversion;

import com.azure.cosmos.implementation.CosmosSchedulers;
import com.azure.cosmos.implementation.changefeed.CancellationToken;
import com.azure.cosmos.implementation.changefeed.Lease;
import com.azure.cosmos.implementation.changefeed.LeaseManager;
import com.azure.cosmos.implementation.changefeed.LeaseRenewer;
import com.azure.cosmos.implementation.changefeed.exceptions.LeaseLostException;
import java.time.Duration;
import java.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

class LeaseRenewerImpl
implements LeaseRenewer {
    private static final Logger logger = LoggerFactory.getLogger(LeaseRenewerImpl.class);
    private final LeaseManager leaseManager;
    private final Duration leaseRenewInterval;
    private Lease lease;
    private RuntimeException resultException;

    public LeaseRenewerImpl(Lease lease, LeaseManager leaseManager, Duration leaseRenewInterval) {
        this.lease = lease;
        this.leaseManager = leaseManager;
        this.leaseRenewInterval = leaseRenewInterval;
    }

    @Override
    public Mono<Void> run(CancellationToken cancellationToken) {
        logger.info("Lease with token {}: renewer task started.", (Object)this.lease.getLeaseToken());
        return Mono.just((Object)this).flatMap(value -> {
            if (cancellationToken.isCancellationRequested()) {
                return Mono.empty();
            }
            Instant stopTimer = Instant.now().plus(this.leaseRenewInterval);
            return Mono.just((Object)value).delayElement(Duration.ofMillis(100L), CosmosSchedulers.COSMOS_PARALLEL).repeat(() -> {
                Instant currentTime = Instant.now();
                return !cancellationToken.isCancellationRequested() && currentTime.isBefore(stopTimer);
            }).last();
        }).flatMap(value -> {
            if (cancellationToken.isCancellationRequested()) {
                return Mono.empty();
            }
            return this.renew(cancellationToken);
        }).repeat(() -> {
            if (cancellationToken.isCancellationRequested()) {
                logger.info("Lease with token {}: renewer task stopped.", (Object)this.lease.getLeaseToken());
            }
            return !cancellationToken.isCancellationRequested();
        }).then().doOnError(throwable -> {
            if (throwable instanceof LeaseLostException) {
                logger.info("Lease with token {}: renew lease loop failed.", (Object)this.lease.getLeaseToken(), throwable);
            } else {
                logger.error("Lease with token {}: renew lease loop failed.", (Object)this.lease.getLeaseToken(), throwable);
            }
        });
    }

    @Override
    public RuntimeException getResultException() {
        return this.resultException;
    }

    private Mono<Lease> renew(CancellationToken cancellationToken) {
        if (cancellationToken.isCancellationRequested()) {
            return Mono.empty();
        }
        return this.leaseManager.renew(this.lease).map(renewedLease -> {
            if (renewedLease != null) {
                this.lease = renewedLease;
            }
            logger.info("Lease with token {}: renewed lease with result {}", (Object)this.lease.getLeaseToken(), (Object)(renewedLease != null ? 1 : 0));
            return renewedLease;
        }).onErrorResume(throwable -> {
            if (throwable instanceof LeaseLostException) {
                LeaseLostException lle = (LeaseLostException)throwable;
                this.resultException = lle;
                logger.error("Lease with token {} with lease token{}: lost lease on renew.", (Object)this.lease.getLeaseToken(), (Object)lle);
                return Mono.error((Throwable)lle);
            }
            logger.error("Lease with token {}: failed to renew lease.", (Object)this.lease.getLeaseToken(), throwable);
            return Mono.empty();
        });
    }
}

