/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.internals.AssignedTasks;
import org.apache.kafka.streams.processor.internals.RestoringTasks;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.TaskAction;
import org.slf4j.Logger;

class AssignedStreamsTasks
extends AssignedTasks<StreamTask>
implements RestoringTasks {
    private final Logger log;
    private final TaskAction<StreamTask> maybeCommitAction;
    private int committed = 0;

    AssignedStreamsTasks(LogContext logContext) {
        super(logContext, "stream task");
        this.log = logContext.logger(this.getClass());
        this.maybeCommitAction = new TaskAction<StreamTask>(){

            @Override
            public String name() {
                return "maybeCommit";
            }

            @Override
            public void apply(StreamTask task) {
                if (task.commitNeeded()) {
                    AssignedStreamsTasks.this.committed++;
                    task.commit();
                    AssignedStreamsTasks.this.log.debug("Committed active task {} per user request in", (Object)task.id());
                }
            }
        };
    }

    @Override
    public StreamTask restoringTaskFor(TopicPartition partition) {
        return (StreamTask)this.restoringByPartition.get(partition);
    }

    int maybeCommit() {
        this.committed = 0;
        this.applyToRunningTasks(this.maybeCommitAction);
        return this.committed;
    }

    Map<TopicPartition, Long> recordsToDelete() {
        HashMap<TopicPartition, Long> recordsToDelete = new HashMap<TopicPartition, Long>();
        for (StreamTask task : this.running.values()) {
            recordsToDelete.putAll(task.purgableOffsets());
        }
        return recordsToDelete;
    }

    int process() {
        int processed = 0;
        Iterator it = this.running.entrySet().iterator();
        while (it.hasNext()) {
            StreamTask task = (StreamTask)it.next().getValue();
            try {
                if (!task.process()) continue;
                ++processed;
            }
            catch (TaskMigratedException e) {
                this.log.info("Failed to process stream task {} since it got migrated to another thread already. Closing it as zombie before triggering a new rebalance.", (Object)task.id());
                RuntimeException fatalException = this.closeZombieTask(task);
                if (fatalException != null) {
                    throw fatalException;
                }
                it.remove();
                throw e;
            }
            catch (RuntimeException e) {
                this.log.error("Failed to process stream task {} due to the following error:", (Object)task.id(), (Object)e);
                throw e;
            }
        }
        return processed;
    }

    int punctuate() {
        int punctuated = 0;
        Iterator it = this.running.entrySet().iterator();
        while (it.hasNext()) {
            StreamTask task = (StreamTask)it.next().getValue();
            try {
                if (task.maybePunctuateStreamTime()) {
                    ++punctuated;
                }
                if (!task.maybePunctuateSystemTime()) continue;
                ++punctuated;
            }
            catch (TaskMigratedException e) {
                this.log.info("Failed to punctuate stream task {} since it got migrated to another thread already. Closing it as zombie before triggering a new rebalance.", (Object)task.id());
                RuntimeException fatalException = this.closeZombieTask(task);
                if (fatalException != null) {
                    throw fatalException;
                }
                it.remove();
                throw e;
            }
            catch (KafkaException e) {
                this.log.error("Failed to punctuate stream task {} due to the following error:", (Object)task.id(), (Object)e);
                throw e;
            }
        }
        return punctuated;
    }
}

