package org.apache.flink.runtime.heartbeat;

import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;

@ThreadSafe
/* loaded from: input_file:org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.class */
public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> {
    private final long heartbeatTimeoutIntervalMs;
    private final ResourceID ownResourceID;
    private final HeartbeatListener<I, O> heartbeatListener;
    private final ScheduledExecutor scheduledExecutor;
    protected final Logger log;
    private final ConcurrentHashMap<ResourceID, HeartbeatMonitor<O>> heartbeatTargets;
    private final Executor executor;
    protected volatile boolean stopped;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl$HeartbeatMonitor.class */
    public static class HeartbeatMonitor<O> implements Runnable {
        private final ResourceID resourceID;
        private final HeartbeatTarget<O> heartbeatTarget;
        private final ScheduledExecutor scheduledExecutor;
        private final HeartbeatListener<?, ?> heartbeatListener;
        private final long heartbeatTimeoutIntervalMs;
        private volatile ScheduledFuture<?> futureTimeout;
        private final AtomicReference<State> state = new AtomicReference<>(State.RUNNING);
        private volatile long lastHeartbeat;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl$HeartbeatMonitor$State.class */
        public enum State {
            RUNNING,
            TIMEOUT,
            CANCELED
        }

        HeartbeatMonitor(ResourceID resourceID, HeartbeatTarget<O> heartbeatTarget, ScheduledExecutor scheduledExecutor, HeartbeatListener<?, O> heartbeatListener, long j) {
            this.resourceID = (ResourceID) Preconditions.checkNotNull(resourceID);
            this.heartbeatTarget = (HeartbeatTarget) Preconditions.checkNotNull(heartbeatTarget);
            this.scheduledExecutor = (ScheduledExecutor) Preconditions.checkNotNull(scheduledExecutor);
            this.heartbeatListener = (HeartbeatListener) Preconditions.checkNotNull(heartbeatListener);
            Preconditions.checkArgument(j >= 0, "The heartbeat timeout interval has to be larger than 0.");
            this.heartbeatTimeoutIntervalMs = j;
            this.lastHeartbeat = 0L;
            resetHeartbeatTimeout(j);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public HeartbeatTarget<O> getHeartbeatTarget() {
            return this.heartbeatTarget;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public ResourceID getHeartbeatTargetId() {
            return this.resourceID;
        }

        public long getLastHeartbeat() {
            return this.lastHeartbeat;
        }

        void reportHeartbeat() {
            this.lastHeartbeat = System.currentTimeMillis();
            resetHeartbeatTimeout(this.heartbeatTimeoutIntervalMs);
        }

        void resetHeartbeatTimeout(long j) {
            if (this.state.get() == State.RUNNING) {
                cancelTimeout();
                this.futureTimeout = this.scheduledExecutor.schedule(this, j, TimeUnit.MILLISECONDS);
                if (this.state.get() != State.RUNNING) {
                    cancelTimeout();
                }
            }
        }

        void cancel() {
            if (this.state.compareAndSet(State.RUNNING, State.CANCELED)) {
                cancelTimeout();
            }
        }

        private void cancelTimeout() {
            if (this.futureTimeout != null) {
                this.futureTimeout.cancel(true);
            }
        }

        public boolean isCanceled() {
            return this.state.get() == State.CANCELED;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (this.state.compareAndSet(State.RUNNING, State.TIMEOUT)) {
                this.heartbeatListener.notifyHeartbeatTimeout(this.resourceID);
            }
        }
    }

    public HeartbeatManagerImpl(long j, ResourceID resourceID, HeartbeatListener<I, O> heartbeatListener, Executor executor, ScheduledExecutor scheduledExecutor, Logger logger) {
        Preconditions.checkArgument(j > 0, "The heartbeat timeout has to be larger than 0.");
        this.heartbeatTimeoutIntervalMs = j;
        this.ownResourceID = (ResourceID) Preconditions.checkNotNull(resourceID);
        this.heartbeatListener = (HeartbeatListener) Preconditions.checkNotNull(heartbeatListener, "heartbeatListener");
        this.scheduledExecutor = (ScheduledExecutor) Preconditions.checkNotNull(scheduledExecutor);
        this.log = (Logger) Preconditions.checkNotNull(logger);
        this.executor = (Executor) Preconditions.checkNotNull(executor);
        this.heartbeatTargets = new ConcurrentHashMap<>(16);
        this.stopped = false;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ResourceID getOwnResourceID() {
        return this.ownResourceID;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Executor getExecutor() {
        return this.executor;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public HeartbeatListener<I, O> getHeartbeatListener() {
        return this.heartbeatListener;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Collection<HeartbeatMonitor<O>> getHeartbeatTargets() {
        return this.heartbeatTargets.values();
    }

    @Override // org.apache.flink.runtime.heartbeat.HeartbeatManager
    public void monitorTarget(ResourceID resourceID, HeartbeatTarget<O> heartbeatTarget) {
        if (this.stopped) {
            return;
        }
        if (this.heartbeatTargets.containsKey(resourceID)) {
            this.log.info("The target with resource ID {} is already been monitored.", resourceID);
            return;
        }
        HeartbeatMonitor<O> heartbeatMonitor = new HeartbeatMonitor<>(resourceID, heartbeatTarget, this.scheduledExecutor, this.heartbeatListener, this.heartbeatTimeoutIntervalMs);
        this.heartbeatTargets.put(resourceID, heartbeatMonitor);
        if (this.stopped) {
            heartbeatMonitor.cancel();
            this.heartbeatTargets.remove(resourceID);
        }
    }

    @Override // org.apache.flink.runtime.heartbeat.HeartbeatManager
    public void unmonitorTarget(ResourceID resourceID) {
        HeartbeatMonitor<O> remove;
        if (this.stopped || (remove = this.heartbeatTargets.remove(resourceID)) == null) {
            return;
        }
        remove.cancel();
    }

    @Override // org.apache.flink.runtime.heartbeat.HeartbeatManager
    public void stop() {
        this.stopped = true;
        Iterator<HeartbeatMonitor<O>> it = this.heartbeatTargets.values().iterator();
        while (it.hasNext()) {
            it.next().cancel();
        }
        this.heartbeatTargets.clear();
    }

    @Override // org.apache.flink.runtime.heartbeat.HeartbeatManager
    public long getLastHeartbeatFrom(ResourceID resourceID) {
        HeartbeatMonitor<O> heartbeatMonitor = this.heartbeatTargets.get(resourceID);
        if (heartbeatMonitor != null) {
            return heartbeatMonitor.getLastHeartbeat();
        }
        return -1L;
    }

    @Override // org.apache.flink.runtime.heartbeat.HeartbeatTarget
    public void receiveHeartbeat(ResourceID resourceID, I i) {
        if (this.stopped) {
            return;
        }
        this.log.debug("Received heartbeat from {}.", resourceID);
        reportHeartbeat(resourceID);
        if (i != null) {
            this.heartbeatListener.reportPayload(resourceID, i);
        }
    }

    @Override // org.apache.flink.runtime.heartbeat.HeartbeatTarget
    public void requestHeartbeat(ResourceID resourceID, I i) {
        if (this.stopped) {
            return;
        }
        this.log.debug("Received heartbeat request from {}.", resourceID);
        HeartbeatTarget<O> reportHeartbeat = reportHeartbeat(resourceID);
        if (reportHeartbeat != null) {
            if (i != null) {
                this.heartbeatListener.reportPayload(resourceID, i);
            }
            CompletableFuture<O> retrievePayload = this.heartbeatListener.retrievePayload(resourceID);
            if (retrievePayload != null) {
                retrievePayload.thenAcceptAsync(obj -> {
                    reportHeartbeat.receiveHeartbeat(getOwnResourceID(), obj);
                }, this.executor).exceptionally(th -> {
                    this.log.warn("Could not send heartbeat to target with id {}.", resourceID, th);
                    return null;
                });
            } else {
                reportHeartbeat.receiveHeartbeat(this.ownResourceID, null);
            }
        }
    }

    HeartbeatTarget<O> reportHeartbeat(ResourceID resourceID) {
        if (!this.heartbeatTargets.containsKey(resourceID)) {
            return null;
        }
        HeartbeatMonitor<O> heartbeatMonitor = this.heartbeatTargets.get(resourceID);
        heartbeatMonitor.reportHeartbeat();
        return heartbeatMonitor.getHeartbeatTarget();
    }
}
