/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.Comparator;
import java.util.HashSet;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.TimerService;
import org.apache.flink.util.Preconditions;

public class TestProcessingTimeService
implements TimerService {
    private volatile long currentTime = Long.MIN_VALUE;
    private volatile boolean isTerminated;
    private volatile boolean isQuiesced;
    private final PriorityQueue<Tuple2<Long, CallbackTask>> priorityQueue = new PriorityQueue<Tuple2<Long, CallbackTask>>(16, new Comparator<Tuple2<Long, CallbackTask>>(){

        @Override
        public int compare(Tuple2<Long, CallbackTask> o1, Tuple2<Long, CallbackTask> o2) {
            return Long.compare((Long)o1.f0, (Long)o2.f0);
        }
    });

    public void setCurrentTime(long timestamp) throws Exception {
        this.currentTime = timestamp;
        if (!this.isQuiesced) {
            while (!this.priorityQueue.isEmpty() && this.currentTime >= (Long)this.priorityQueue.peek().f0) {
                Tuple2<Long, CallbackTask> entry = this.priorityQueue.poll();
                CallbackTask callbackTask = (CallbackTask)entry.f1;
                if (callbackTask.isDone()) continue;
                callbackTask.onProcessingTime((Long)entry.f0);
                if (!(callbackTask instanceof PeriodicCallbackTask)) continue;
                this.priorityQueue.offer((Tuple2<Long, CallbackTask>)Tuple2.of((Object)((PeriodicCallbackTask)callbackTask).nextTimestamp((Long)entry.f0), (Object)callbackTask));
            }
        }
    }

    @Override
    public long getCurrentProcessingTime() {
        return this.currentTime;
    }

    @Override
    public ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target) {
        if (this.isTerminated) {
            throw new IllegalStateException("terminated");
        }
        if (this.isQuiesced) {
            return new CallbackTask(null);
        }
        CallbackTask callbackTask = new CallbackTask(target);
        this.priorityQueue.offer((Tuple2<Long, CallbackTask>)Tuple2.of((Object)timestamp, (Object)callbackTask));
        return callbackTask;
    }

    @Override
    public ScheduledFuture<?> scheduleAtFixedRate(ProcessingTimeCallback callback, long initialDelay, long period) {
        if (this.isTerminated) {
            throw new IllegalStateException("terminated");
        }
        if (this.isQuiesced) {
            return new CallbackTask(null);
        }
        PeriodicCallbackTask periodicCallbackTask = new PeriodicCallbackTask(callback, period);
        this.priorityQueue.offer((Tuple2<Long, CallbackTask>)Tuple2.of((Object)(this.currentTime + initialDelay), (Object)periodicCallbackTask));
        return periodicCallbackTask;
    }

    @Override
    public boolean isTerminated() {
        return this.isTerminated;
    }

    @Override
    public void quiesce() {
        if (!this.isTerminated) {
            this.isQuiesced = true;
            this.priorityQueue.clear();
        }
    }

    @Override
    public void awaitPendingAfterQuiesce() throws InterruptedException {
    }

    @Override
    public void shutdownService() {
        this.isTerminated = true;
    }

    @Override
    public boolean shutdownServiceUninterruptible(long timeoutMs) {
        this.shutdownService();
        return true;
    }

    public int getNumActiveTimers() {
        int count = 0;
        for (Tuple2<Long, CallbackTask> entry : this.priorityQueue) {
            if (((CallbackTask)entry.f1).isDone()) continue;
            ++count;
        }
        return count;
    }

    public Set<Long> getActiveTimerTimestamps() {
        HashSet<Long> actualTimestamps = new HashSet<Long>();
        for (Tuple2<Long, CallbackTask> entry : this.priorityQueue) {
            if (((CallbackTask)entry.f1).isDone()) continue;
            actualTimestamps.add((Long)entry.f0);
        }
        return actualTimestamps;
    }

    private static class PeriodicCallbackTask
    extends CallbackTask {
        private final long period;

        private PeriodicCallbackTask(ProcessingTimeCallback processingTimeCallback, long period) {
            super(processingTimeCallback);
            Preconditions.checkArgument((period > 0L ? 1 : 0) != 0, (Object)"The period must be greater than 0.");
            this.period = period;
        }

        @Override
        public void onProcessingTime(long timestamp) throws Exception {
            this.processingTimeCallback.onProcessingTime(timestamp);
        }

        public long nextTimestamp(long currentTimestamp) {
            return currentTimestamp + this.period;
        }
    }

    private static class CallbackTask
    implements ScheduledFuture<Object> {
        protected final ProcessingTimeCallback processingTimeCallback;
        private AtomicReference<CallbackTaskState> state = new AtomicReference<CallbackTaskState>(CallbackTaskState.CREATED);

        private CallbackTask(ProcessingTimeCallback processingTimeCallback) {
            this.processingTimeCallback = processingTimeCallback;
        }

        public void onProcessingTime(long timestamp) throws Exception {
            this.processingTimeCallback.onProcessingTime(timestamp);
            this.state.compareAndSet(CallbackTaskState.CREATED, CallbackTaskState.DONE);
        }

        @Override
        public long getDelay(TimeUnit unit) {
            throw new UnsupportedOperationException();
        }

        @Override
        public int compareTo(Delayed o) {
            throw new UnsupportedOperationException();
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            return this.state.compareAndSet(CallbackTaskState.CREATED, CallbackTaskState.CANCELLED);
        }

        @Override
        public boolean isCancelled() {
            return this.state.get() == CallbackTaskState.CANCELLED;
        }

        @Override
        public boolean isDone() {
            return this.state.get() != CallbackTaskState.CREATED;
        }

        @Override
        public Object get() throws InterruptedException, ExecutionException {
            throw new UnsupportedOperationException();
        }

        @Override
        public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            throw new UnsupportedOperationException();
        }

        static enum CallbackTaskState {
            CREATED,
            CANCELLED,
            DONE;

        }
    }
}

