/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.requests.EpochEndOffset;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Test;

public class SubscriptionStateTest {
    private SubscriptionState state = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
    private final String topic = "test";
    private final String topic1 = "test1";
    private final TopicPartition tp0 = new TopicPartition("test", 0);
    private final TopicPartition tp1 = new TopicPartition("test", 1);
    private final TopicPartition t1p0 = new TopicPartition("test1", 0);
    private final MockRebalanceListener rebalanceListener = new MockRebalanceListener();
    private final Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(Node.noNode(), Optional.empty());

    @Test
    public void partitionAssignment() {
        this.state.assignFromUser(Collections.singleton(this.tp0));
        Assert.assertEquals(Collections.singleton(this.tp0), (Object)this.state.assignedPartitions());
        Assert.assertEquals((long)1L, (long)this.state.numAssignedPartitions());
        Assert.assertFalse((boolean)this.state.hasAllFetchPositions());
        this.state.seek(this.tp0, 1L);
        Assert.assertTrue((boolean)this.state.isFetchable(this.tp0));
        Assert.assertEquals((long)1L, (long)this.state.position((TopicPartition)this.tp0).offset);
        this.state.assignFromUser(Collections.emptySet());
        Assert.assertTrue((boolean)this.state.assignedPartitions().isEmpty());
        Assert.assertEquals((long)0L, (long)this.state.numAssignedPartitions());
        Assert.assertFalse((boolean)this.state.isAssigned(this.tp0));
        Assert.assertFalse((boolean)this.state.isFetchable(this.tp0));
    }

    @Test
    public void partitionAssignmentChangeOnTopicSubscription() {
        this.state.assignFromUser(new HashSet<TopicPartition>(Arrays.asList(this.tp0, this.tp1)));
        Assert.assertEquals((long)2L, (long)this.state.assignedPartitions().size());
        Assert.assertEquals((long)2L, (long)this.state.numAssignedPartitions());
        Assert.assertTrue((boolean)this.state.assignedPartitions().contains(this.tp0));
        Assert.assertTrue((boolean)this.state.assignedPartitions().contains(this.tp1));
        this.state.unsubscribe();
        Assert.assertTrue((boolean)this.state.assignedPartitions().isEmpty());
        Assert.assertEquals((long)0L, (long)this.state.numAssignedPartitions());
        this.state.subscribe(Collections.singleton("test1"), (ConsumerRebalanceListener)this.rebalanceListener);
        Assert.assertTrue((boolean)this.state.assignedPartitions().isEmpty());
        Assert.assertEquals((long)0L, (long)this.state.numAssignedPartitions());
        Assert.assertTrue((boolean)this.state.assignFromSubscribed(Collections.singleton(this.t1p0)));
        Assert.assertEquals(Collections.singleton(this.t1p0), (Object)this.state.assignedPartitions());
        Assert.assertEquals((long)1L, (long)this.state.numAssignedPartitions());
        this.state.subscribe(Collections.singleton("test"), (ConsumerRebalanceListener)this.rebalanceListener);
        Assert.assertEquals(Collections.singleton(this.t1p0), (Object)this.state.assignedPartitions());
        Assert.assertEquals((long)1L, (long)this.state.numAssignedPartitions());
        this.state.unsubscribe();
        Assert.assertTrue((boolean)this.state.assignedPartitions().isEmpty());
        Assert.assertEquals((long)0L, (long)this.state.numAssignedPartitions());
    }

    @Test
    public void partitionAssignmentChangeOnPatternSubscription() {
        this.state.subscribe(Pattern.compile(".*"), (ConsumerRebalanceListener)this.rebalanceListener);
        Assert.assertTrue((boolean)this.state.assignedPartitions().isEmpty());
        Assert.assertEquals((long)0L, (long)this.state.numAssignedPartitions());
        this.state.subscribeFromPattern(new HashSet<String>(Collections.singletonList("test")));
        Assert.assertTrue((boolean)this.state.assignedPartitions().isEmpty());
        Assert.assertEquals((long)0L, (long)this.state.numAssignedPartitions());
        Assert.assertTrue((boolean)this.state.assignFromSubscribed(Collections.singleton(this.tp1)));
        Assert.assertEquals(Collections.singleton(this.tp1), (Object)this.state.assignedPartitions());
        Assert.assertEquals((long)1L, (long)this.state.numAssignedPartitions());
        Assert.assertEquals(Collections.singleton("test"), (Object)this.state.subscription());
        Assert.assertTrue((boolean)this.state.assignFromSubscribed(Collections.singletonList(this.t1p0)));
        Assert.assertEquals(Collections.singleton(this.t1p0), (Object)this.state.assignedPartitions());
        Assert.assertEquals((long)1L, (long)this.state.numAssignedPartitions());
        Assert.assertEquals(Collections.singleton("test"), (Object)this.state.subscription());
        this.state.subscribe(Pattern.compile(".*t"), (ConsumerRebalanceListener)this.rebalanceListener);
        Assert.assertEquals(Collections.singleton(this.t1p0), (Object)this.state.assignedPartitions());
        Assert.assertEquals((long)1L, (long)this.state.numAssignedPartitions());
        this.state.subscribeFromPattern(Collections.singleton("test"));
        Assert.assertEquals(Collections.singleton(this.t1p0), (Object)this.state.assignedPartitions());
        Assert.assertEquals((long)1L, (long)this.state.numAssignedPartitions());
        Assert.assertTrue((boolean)this.state.assignFromSubscribed(Collections.singletonList(this.tp0)));
        Assert.assertEquals(Collections.singleton(this.tp0), (Object)this.state.assignedPartitions());
        Assert.assertEquals((long)1L, (long)this.state.numAssignedPartitions());
        Assert.assertEquals(Collections.singleton("test"), (Object)this.state.subscription());
        this.state.unsubscribe();
        Assert.assertTrue((boolean)this.state.assignedPartitions().isEmpty());
        Assert.assertEquals((long)0L, (long)this.state.numAssignedPartitions());
    }

    @Test
    public void verifyAssignmentId() {
        Assert.assertEquals((long)0L, (long)this.state.assignmentId());
        Set userAssignment = Utils.mkSet((Object[])new TopicPartition[]{this.tp0, this.tp1});
        this.state.assignFromUser(userAssignment);
        Assert.assertEquals((long)1L, (long)this.state.assignmentId());
        Assert.assertEquals((Object)userAssignment, (Object)this.state.assignedPartitions());
        this.state.unsubscribe();
        Assert.assertEquals((long)2L, (long)this.state.assignmentId());
        Assert.assertEquals(Collections.emptySet(), (Object)this.state.assignedPartitions());
        Set autoAssignment = Utils.mkSet((Object[])new TopicPartition[]{this.t1p0});
        this.state.subscribe(Collections.singleton("test1"), (ConsumerRebalanceListener)this.rebalanceListener);
        Assert.assertTrue((boolean)this.state.assignFromSubscribed((Collection)autoAssignment));
        Assert.assertEquals((long)3L, (long)this.state.assignmentId());
        Assert.assertEquals((Object)autoAssignment, (Object)this.state.assignedPartitions());
    }

    @Test
    public void partitionReset() {
        this.state.assignFromUser(Collections.singleton(this.tp0));
        this.state.seek(this.tp0, 5L);
        Assert.assertEquals((long)5L, (long)this.state.position((TopicPartition)this.tp0).offset);
        this.state.requestOffsetReset(this.tp0);
        Assert.assertFalse((boolean)this.state.isFetchable(this.tp0));
        Assert.assertTrue((boolean)this.state.isOffsetResetNeeded(this.tp0));
        Assert.assertNotNull((Object)this.state.position(this.tp0));
        this.state.seek(this.tp0, 0L);
        Assert.assertTrue((boolean)this.state.isFetchable(this.tp0));
        Assert.assertFalse((boolean)this.state.isOffsetResetNeeded(this.tp0));
    }

    @Test
    public void topicSubscription() {
        this.state.subscribe(Collections.singleton("test"), (ConsumerRebalanceListener)this.rebalanceListener);
        Assert.assertEquals((long)1L, (long)this.state.subscription().size());
        Assert.assertTrue((boolean)this.state.assignedPartitions().isEmpty());
        Assert.assertEquals((long)0L, (long)this.state.numAssignedPartitions());
        Assert.assertTrue((boolean)this.state.partitionsAutoAssigned());
        Assert.assertTrue((boolean)this.state.assignFromSubscribed(Collections.singleton(this.tp0)));
        this.state.seek(this.tp0, 1L);
        Assert.assertEquals((long)1L, (long)this.state.position((TopicPartition)this.tp0).offset);
        Assert.assertTrue((boolean)this.state.assignFromSubscribed(Collections.singleton(this.tp1)));
        Assert.assertTrue((boolean)this.state.isAssigned(this.tp1));
        Assert.assertFalse((boolean)this.state.isAssigned(this.tp0));
        Assert.assertFalse((boolean)this.state.isFetchable(this.tp1));
        Assert.assertEquals(Collections.singleton(this.tp1), (Object)this.state.assignedPartitions());
        Assert.assertEquals((long)1L, (long)this.state.numAssignedPartitions());
    }

    @Test
    public void partitionPause() {
        this.state.assignFromUser(Collections.singleton(this.tp0));
        this.state.seek(this.tp0, 100L);
        Assert.assertTrue((boolean)this.state.isFetchable(this.tp0));
        this.state.pause(this.tp0);
        Assert.assertFalse((boolean)this.state.isFetchable(this.tp0));
        this.state.resume(this.tp0);
        Assert.assertTrue((boolean)this.state.isFetchable(this.tp0));
    }

    @Test(expected=IllegalStateException.class)
    public void invalidPositionUpdate() {
        this.state.subscribe(Collections.singleton("test"), (ConsumerRebalanceListener)this.rebalanceListener);
        Assert.assertTrue((boolean)this.state.assignFromSubscribed(Collections.singleton(this.tp0)));
        this.state.position(this.tp0, new SubscriptionState.FetchPosition(0L, Optional.empty(), this.leaderAndEpoch));
    }

    @Test
    public void cantAssignPartitionForUnsubscribedTopics() {
        this.state.subscribe(Collections.singleton("test"), (ConsumerRebalanceListener)this.rebalanceListener);
        Assert.assertFalse((boolean)this.state.assignFromSubscribed(Collections.singletonList(this.t1p0)));
    }

    @Test
    public void cantAssignPartitionForUnmatchedPattern() {
        this.state.subscribe(Pattern.compile(".*t"), (ConsumerRebalanceListener)this.rebalanceListener);
        this.state.subscribeFromPattern(new HashSet<String>(Collections.singletonList("test")));
        Assert.assertFalse((boolean)this.state.assignFromSubscribed(Collections.singletonList(this.t1p0)));
    }

    @Test(expected=IllegalStateException.class)
    public void cantChangePositionForNonAssignedPartition() {
        this.state.position(this.tp0, new SubscriptionState.FetchPosition(1L, Optional.empty(), this.leaderAndEpoch));
    }

    @Test(expected=IllegalStateException.class)
    public void cantSubscribeTopicAndPattern() {
        this.state.subscribe(Collections.singleton("test"), (ConsumerRebalanceListener)this.rebalanceListener);
        this.state.subscribe(Pattern.compile(".*"), (ConsumerRebalanceListener)this.rebalanceListener);
    }

    @Test(expected=IllegalStateException.class)
    public void cantSubscribePartitionAndPattern() {
        this.state.assignFromUser(Collections.singleton(this.tp0));
        this.state.subscribe(Pattern.compile(".*"), (ConsumerRebalanceListener)this.rebalanceListener);
    }

    @Test(expected=IllegalStateException.class)
    public void cantSubscribePatternAndTopic() {
        this.state.subscribe(Pattern.compile(".*"), (ConsumerRebalanceListener)this.rebalanceListener);
        this.state.subscribe(Collections.singleton("test"), (ConsumerRebalanceListener)this.rebalanceListener);
    }

    @Test(expected=IllegalStateException.class)
    public void cantSubscribePatternAndPartition() {
        this.state.subscribe(Pattern.compile(".*"), (ConsumerRebalanceListener)this.rebalanceListener);
        this.state.assignFromUser(Collections.singleton(this.tp0));
    }

    @Test
    public void patternSubscription() {
        this.state.subscribe(Pattern.compile(".*"), (ConsumerRebalanceListener)this.rebalanceListener);
        this.state.subscribeFromPattern(new HashSet<String>(Arrays.asList("test", "test1")));
        Assert.assertEquals((String)"Expected subscribed topics count is incorrect", (long)2L, (long)this.state.subscription().size());
    }

    @Test
    public void unsubscribeUserAssignment() {
        this.state.assignFromUser(new HashSet<TopicPartition>(Arrays.asList(this.tp0, this.tp1)));
        this.state.unsubscribe();
        this.state.subscribe(Collections.singleton("test"), (ConsumerRebalanceListener)this.rebalanceListener);
        Assert.assertEquals(Collections.singleton("test"), (Object)this.state.subscription());
    }

    @Test
    public void unsubscribeUserSubscribe() {
        this.state.subscribe(Collections.singleton("test"), (ConsumerRebalanceListener)this.rebalanceListener);
        this.state.unsubscribe();
        this.state.assignFromUser(Collections.singleton(this.tp0));
        Assert.assertEquals(Collections.singleton(this.tp0), (Object)this.state.assignedPartitions());
        Assert.assertEquals((long)1L, (long)this.state.numAssignedPartitions());
    }

    @Test
    public void unsubscription() {
        this.state.subscribe(Pattern.compile(".*"), (ConsumerRebalanceListener)this.rebalanceListener);
        this.state.subscribeFromPattern(new HashSet<String>(Arrays.asList("test", "test1")));
        Assert.assertTrue((boolean)this.state.assignFromSubscribed(Collections.singleton(this.tp1)));
        Assert.assertEquals(Collections.singleton(this.tp1), (Object)this.state.assignedPartitions());
        Assert.assertEquals((long)1L, (long)this.state.numAssignedPartitions());
        this.state.unsubscribe();
        Assert.assertEquals((long)0L, (long)this.state.subscription().size());
        Assert.assertTrue((boolean)this.state.assignedPartitions().isEmpty());
        Assert.assertEquals((long)0L, (long)this.state.numAssignedPartitions());
        this.state.assignFromUser(Collections.singleton(this.tp0));
        Assert.assertEquals(Collections.singleton(this.tp0), (Object)this.state.assignedPartitions());
        Assert.assertEquals((long)1L, (long)this.state.numAssignedPartitions());
        this.state.unsubscribe();
        Assert.assertEquals((long)0L, (long)this.state.subscription().size());
        Assert.assertTrue((boolean)this.state.assignedPartitions().isEmpty());
        Assert.assertEquals((long)0L, (long)this.state.numAssignedPartitions());
    }

    @Test
    public void testPreferredReadReplicaLease() {
        this.state.assignFromUser(Collections.singleton(this.tp0));
        Assert.assertFalse((boolean)this.state.preferredReadReplica(this.tp0, 0L).isPresent());
        this.state.updatePreferredReadReplica(this.tp0, 42, () -> 10L);
        TestUtils.assertOptional(this.state.preferredReadReplica(this.tp0, 9L), value -> Assert.assertEquals((long)value.intValue(), (long)42L));
        TestUtils.assertOptional(this.state.preferredReadReplica(this.tp0, 10L), value -> Assert.assertEquals((long)value.intValue(), (long)42L));
        Assert.assertFalse((boolean)this.state.preferredReadReplica(this.tp0, 11L).isPresent());
        this.state.clearPreferredReadReplica(this.tp0);
        Assert.assertFalse((boolean)this.state.preferredReadReplica(this.tp0, 9L).isPresent());
        Assert.assertFalse((boolean)this.state.preferredReadReplica(this.tp0, 11L).isPresent());
        this.state.updatePreferredReadReplica(this.tp0, 43, () -> 20L);
        TestUtils.assertOptional(this.state.preferredReadReplica(this.tp0, 11L), value -> Assert.assertEquals((long)value.intValue(), (long)43L));
        TestUtils.assertOptional(this.state.preferredReadReplica(this.tp0, 20L), value -> Assert.assertEquals((long)value.intValue(), (long)43L));
        Assert.assertFalse((boolean)this.state.preferredReadReplica(this.tp0, 21L).isPresent());
        this.state.updatePreferredReadReplica(this.tp0, 44, () -> 30L);
        TestUtils.assertOptional(this.state.preferredReadReplica(this.tp0, 30L), value -> Assert.assertEquals((long)value.intValue(), (long)44L));
        Assert.assertFalse((boolean)this.state.preferredReadReplica(this.tp0, 31L).isPresent());
    }

    @Test
    public void testSeekUnvalidatedWithNoOffsetEpoch() {
        Node broker1 = new Node(1, "localhost", 9092);
        this.state.assignFromUser(Collections.singleton(this.tp0));
        this.state.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(0L, Optional.empty(), new Metadata.LeaderAndEpoch(broker1, Optional.of(5))));
        Assert.assertTrue((boolean)this.state.hasValidPosition(this.tp0));
        Assert.assertFalse((boolean)this.state.awaitingValidation(this.tp0));
        Assert.assertFalse((boolean)this.state.maybeValidatePositionForCurrentLeader(this.tp0, new Metadata.LeaderAndEpoch(broker1, Optional.empty())));
        Assert.assertTrue((boolean)this.state.hasValidPosition(this.tp0));
        Assert.assertFalse((boolean)this.state.awaitingValidation(this.tp0));
        Assert.assertFalse((boolean)this.state.maybeValidatePositionForCurrentLeader(this.tp0, new Metadata.LeaderAndEpoch(broker1, Optional.of(10))));
        Assert.assertTrue((boolean)this.state.hasValidPosition(this.tp0));
        Assert.assertFalse((boolean)this.state.awaitingValidation(this.tp0));
    }

    @Test
    public void testSeekUnvalidatedWithNoEpochClearsAwaitingValidation() {
        Node broker1 = new Node(1, "localhost", 9092);
        this.state.assignFromUser(Collections.singleton(this.tp0));
        this.state.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(0L, Optional.of(2), new Metadata.LeaderAndEpoch(broker1, Optional.of(5))));
        Assert.assertFalse((boolean)this.state.hasValidPosition(this.tp0));
        Assert.assertTrue((boolean)this.state.awaitingValidation(this.tp0));
        this.state.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(0L, Optional.empty(), new Metadata.LeaderAndEpoch(broker1, Optional.of(5))));
        Assert.assertTrue((boolean)this.state.hasValidPosition(this.tp0));
        Assert.assertFalse((boolean)this.state.awaitingValidation(this.tp0));
    }

    @Test
    public void testSeekUnvalidatedWithOffsetEpoch() {
        Node broker1 = new Node(1, "localhost", 9092);
        this.state.assignFromUser(Collections.singleton(this.tp0));
        this.state.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(0L, Optional.of(2), new Metadata.LeaderAndEpoch(broker1, Optional.of(5))));
        Assert.assertFalse((boolean)this.state.hasValidPosition(this.tp0));
        Assert.assertTrue((boolean)this.state.awaitingValidation(this.tp0));
        Assert.assertTrue((boolean)this.state.maybeValidatePositionForCurrentLeader(this.tp0, new Metadata.LeaderAndEpoch(broker1, Optional.of(5))));
        Assert.assertFalse((boolean)this.state.hasValidPosition(this.tp0));
        Assert.assertTrue((boolean)this.state.awaitingValidation(this.tp0));
        Assert.assertTrue((boolean)this.state.maybeValidatePositionForCurrentLeader(this.tp0, new Metadata.LeaderAndEpoch(broker1, Optional.of(15))));
        Assert.assertFalse((boolean)this.state.hasValidPosition(this.tp0));
        Assert.assertTrue((boolean)this.state.awaitingValidation(this.tp0));
        Assert.assertFalse((boolean)this.state.maybeValidatePositionForCurrentLeader(this.tp0, new Metadata.LeaderAndEpoch(broker1, Optional.empty())));
        Assert.assertTrue((boolean)this.state.hasValidPosition(this.tp0));
        Assert.assertFalse((boolean)this.state.awaitingValidation(this.tp0));
    }

    @Test
    public void testSeekValidatedShouldClearAwaitingValidation() {
        Node broker1 = new Node(1, "localhost", 9092);
        this.state.assignFromUser(Collections.singleton(this.tp0));
        this.state.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(10L, Optional.of(5), new Metadata.LeaderAndEpoch(broker1, Optional.of(10))));
        Assert.assertFalse((boolean)this.state.hasValidPosition(this.tp0));
        Assert.assertTrue((boolean)this.state.awaitingValidation(this.tp0));
        Assert.assertEquals((long)10L, (long)this.state.position((TopicPartition)this.tp0).offset);
        this.state.seekValidated(this.tp0, new SubscriptionState.FetchPosition(8L, Optional.of(4), new Metadata.LeaderAndEpoch(broker1, Optional.of(10))));
        Assert.assertTrue((boolean)this.state.hasValidPosition(this.tp0));
        Assert.assertFalse((boolean)this.state.awaitingValidation(this.tp0));
        Assert.assertEquals((long)8L, (long)this.state.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testCompleteValidationShouldClearAwaitingValidation() {
        Node broker1 = new Node(1, "localhost", 9092);
        this.state.assignFromUser(Collections.singleton(this.tp0));
        this.state.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(10L, Optional.of(5), new Metadata.LeaderAndEpoch(broker1, Optional.of(10))));
        Assert.assertFalse((boolean)this.state.hasValidPosition(this.tp0));
        Assert.assertTrue((boolean)this.state.awaitingValidation(this.tp0));
        Assert.assertEquals((long)10L, (long)this.state.position((TopicPartition)this.tp0).offset);
        this.state.completeValidation(this.tp0);
        Assert.assertTrue((boolean)this.state.hasValidPosition(this.tp0));
        Assert.assertFalse((boolean)this.state.awaitingValidation(this.tp0));
        Assert.assertEquals((long)10L, (long)this.state.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testOffsetResetWhileAwaitingValidation() {
        Node broker1 = new Node(1, "localhost", 9092);
        this.state.assignFromUser(Collections.singleton(this.tp0));
        this.state.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(10L, Optional.of(5), new Metadata.LeaderAndEpoch(broker1, Optional.of(10))));
        Assert.assertTrue((boolean)this.state.awaitingValidation(this.tp0));
        this.state.requestOffsetReset(this.tp0, OffsetResetStrategy.EARLIEST);
        Assert.assertFalse((boolean)this.state.awaitingValidation(this.tp0));
        Assert.assertTrue((boolean)this.state.isOffsetResetNeeded(this.tp0));
    }

    @Test
    public void testMaybeCompleteValidation() {
        Node broker1 = new Node(1, "localhost", 9092);
        this.state.assignFromUser(Collections.singleton(this.tp0));
        int currentEpoch = 10;
        long initialOffset = 10L;
        int initialOffsetEpoch = 5;
        SubscriptionState.FetchPosition initialPosition = new SubscriptionState.FetchPosition(initialOffset, Optional.of(initialOffsetEpoch), new Metadata.LeaderAndEpoch(broker1, Optional.of(currentEpoch)));
        this.state.seekUnvalidated(this.tp0, initialPosition);
        Assert.assertTrue((boolean)this.state.awaitingValidation(this.tp0));
        Optional divergentOffsetMetadataOpt = this.state.maybeCompleteValidation(this.tp0, initialPosition, new EpochEndOffset(initialOffsetEpoch, initialOffset + 5L));
        Assert.assertEquals(Optional.empty(), (Object)divergentOffsetMetadataOpt);
        Assert.assertFalse((boolean)this.state.awaitingValidation(this.tp0));
        Assert.assertEquals((Object)initialPosition, (Object)this.state.position(this.tp0));
    }

    @Test
    public void testMaybeCompleteValidationAfterPositionChange() {
        Node broker1 = new Node(1, "localhost", 9092);
        this.state.assignFromUser(Collections.singleton(this.tp0));
        int currentEpoch = 10;
        long initialOffset = 10L;
        int initialOffsetEpoch = 5;
        long updateOffset = 20L;
        int updateOffsetEpoch = 8;
        SubscriptionState.FetchPosition initialPosition = new SubscriptionState.FetchPosition(initialOffset, Optional.of(initialOffsetEpoch), new Metadata.LeaderAndEpoch(broker1, Optional.of(currentEpoch)));
        this.state.seekUnvalidated(this.tp0, initialPosition);
        Assert.assertTrue((boolean)this.state.awaitingValidation(this.tp0));
        SubscriptionState.FetchPosition updatePosition = new SubscriptionState.FetchPosition(updateOffset, Optional.of(updateOffsetEpoch), new Metadata.LeaderAndEpoch(broker1, Optional.of(currentEpoch)));
        this.state.seekUnvalidated(this.tp0, updatePosition);
        Optional divergentOffsetMetadataOpt = this.state.maybeCompleteValidation(this.tp0, initialPosition, new EpochEndOffset(initialOffsetEpoch, initialOffset + 5L));
        Assert.assertEquals(Optional.empty(), (Object)divergentOffsetMetadataOpt);
        Assert.assertTrue((boolean)this.state.awaitingValidation(this.tp0));
        Assert.assertEquals((Object)updatePosition, (Object)this.state.position(this.tp0));
    }

    @Test
    public void testMaybeCompleteValidationAfterOffsetReset() {
        Node broker1 = new Node(1, "localhost", 9092);
        this.state.assignFromUser(Collections.singleton(this.tp0));
        int currentEpoch = 10;
        long initialOffset = 10L;
        int initialOffsetEpoch = 5;
        SubscriptionState.FetchPosition initialPosition = new SubscriptionState.FetchPosition(initialOffset, Optional.of(initialOffsetEpoch), new Metadata.LeaderAndEpoch(broker1, Optional.of(currentEpoch)));
        this.state.seekUnvalidated(this.tp0, initialPosition);
        Assert.assertTrue((boolean)this.state.awaitingValidation(this.tp0));
        this.state.requestOffsetReset(this.tp0);
        Optional divergentOffsetMetadataOpt = this.state.maybeCompleteValidation(this.tp0, initialPosition, new EpochEndOffset(initialOffsetEpoch, initialOffset + 5L));
        Assert.assertEquals(Optional.empty(), (Object)divergentOffsetMetadataOpt);
        Assert.assertFalse((boolean)this.state.awaitingValidation(this.tp0));
        Assert.assertTrue((boolean)this.state.isOffsetResetNeeded(this.tp0));
    }

    @Test
    public void testTruncationDetectionWithResetPolicy() {
        Node broker1 = new Node(1, "localhost", 9092);
        this.state.assignFromUser(Collections.singleton(this.tp0));
        int currentEpoch = 10;
        long initialOffset = 10L;
        int initialOffsetEpoch = 5;
        long divergentOffset = 5L;
        int divergentOffsetEpoch = 7;
        SubscriptionState.FetchPosition initialPosition = new SubscriptionState.FetchPosition(initialOffset, Optional.of(initialOffsetEpoch), new Metadata.LeaderAndEpoch(broker1, Optional.of(currentEpoch)));
        this.state.seekUnvalidated(this.tp0, initialPosition);
        Assert.assertTrue((boolean)this.state.awaitingValidation(this.tp0));
        Optional divergentOffsetMetadata = this.state.maybeCompleteValidation(this.tp0, initialPosition, new EpochEndOffset(divergentOffsetEpoch, divergentOffset));
        Assert.assertEquals(Optional.empty(), (Object)divergentOffsetMetadata);
        Assert.assertFalse((boolean)this.state.awaitingValidation(this.tp0));
        SubscriptionState.FetchPosition updatedPosition = new SubscriptionState.FetchPosition(divergentOffset, Optional.of(divergentOffsetEpoch), new Metadata.LeaderAndEpoch(broker1, Optional.of(currentEpoch)));
        Assert.assertEquals((Object)updatedPosition, (Object)this.state.position(this.tp0));
    }

    @Test
    public void testTruncationDetectionWithoutResetPolicy() {
        Node broker1 = new Node(1, "localhost", 9092);
        this.state = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE);
        this.state.assignFromUser(Collections.singleton(this.tp0));
        int currentEpoch = 10;
        long initialOffset = 10L;
        int initialOffsetEpoch = 5;
        long divergentOffset = 5L;
        int divergentOffsetEpoch = 7;
        SubscriptionState.FetchPosition initialPosition = new SubscriptionState.FetchPosition(initialOffset, Optional.of(initialOffsetEpoch), new Metadata.LeaderAndEpoch(broker1, Optional.of(currentEpoch)));
        this.state.seekUnvalidated(this.tp0, initialPosition);
        Assert.assertTrue((boolean)this.state.awaitingValidation(this.tp0));
        Optional divergentOffsetMetadata = this.state.maybeCompleteValidation(this.tp0, initialPosition, new EpochEndOffset(divergentOffsetEpoch, divergentOffset));
        Assert.assertEquals(Optional.of(new OffsetAndMetadata(divergentOffset, Optional.of(divergentOffsetEpoch), "")), (Object)divergentOffsetMetadata);
        Assert.assertTrue((boolean)this.state.awaitingValidation(this.tp0));
    }

    private static class MockRebalanceListener
    implements ConsumerRebalanceListener {
        public Collection<TopicPartition> revoked;
        public Collection<TopicPartition> assigned;
        public int revokedCount = 0;
        public int assignedCount = 0;

        private MockRebalanceListener() {
        }

        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            this.assigned = partitions;
            ++this.assignedCount;
        }

        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            this.revoked = partitions;
            ++this.revokedCount;
        }
    }
}

