package org.apache.nifi.processor.util.list;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.list.ListableEntity;
import org.apache.nifi.stream.io.GZIPOutputStream;
import org.apache.nifi.util.StringUtils;

/* loaded from: input_file:org/apache/nifi/processor/util/list/ListedEntityTracker.class */
public class ListedEntityTracker<T extends ListableEntity> {
    private final ObjectMapper objectMapper;
    private volatile Map<String, ListedEntity> alreadyListedEntities;
    private static final String NOTE = "Used by 'Tracking Entities' strategy.";
    public static final PropertyDescriptor TRACKING_STATE_CACHE = new PropertyDescriptor.Builder().name("et-state-cache").displayName("Entity Tracking State Cache").description(String.format("Listed entities are stored in the specified cache storage so that this processor can resume listing across NiFi restart or in case of primary node change. 'Tracking Entities' strategy require tracking information of all listed entities within the last 'Tracking Time Window'. To support large number of entities, the strategy uses DistributedMapCache instead of managed state. Cache key format is 'ListedEntities::{processorId}(::{nodeId})'. If it tracks per node listed entities, then the optional '::{nodeId}' part is added to manage state separately. E.g. cluster wide cache key = 'ListedEntities::8dda2321-0164-1000-50fa-3042fe7d6a7b', per node cache key = 'ListedEntities::8dda2321-0164-1000-50fa-3042fe7d6a7b::nifi-node3' The stored cache content is Gzipped JSON string. The cache key will be deleted when target listing configuration is changed. %s", NOTE)).identifiesControllerService(DistributedMapCacheClient.class).build();
    public static final PropertyDescriptor TRACKING_TIME_WINDOW = new PropertyDescriptor.Builder().name("et-time-window").displayName("Entity Tracking Time Window").description(String.format("Specify how long this processor should track already-listed entities. 'Tracking Entities' strategy can pick any entity whose timestamp is inside the specified time window. For example, if set to '30 minutes', any entity having timestamp in recent 30 minutes will be the listing target when this processor runs. A listed entity is considered 'new/updated' and a FlowFile is emitted if one of following condition meets: 1. does not exist in the already-listed entities, 2. has newer timestamp than the cached entity, 3. has different size than the cached entity. If a cached entity's timestamp becomes older than specified time window, that entity will be removed from the cached already-listed entities. %s", NOTE)).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).defaultValue("3 hours").build();
    private static final AllowableValue INITIAL_LISTING_TARGET_ALL = new AllowableValue("all", "All Available", "Regardless of entities timestamp, all existing entities will be listed at the initial listing activity.");
    private static final AllowableValue INITIAL_LISTING_TARGET_WINDOW = new AllowableValue("window", "Tracking Time Window", "Ignore entities having timestamp older than the specified 'Tracking Time Window' at the initial listing activity.");
    public static final PropertyDescriptor INITIAL_LISTING_TARGET = new PropertyDescriptor.Builder().name("et-initial-listing-target").displayName("Entity Tracking Initial Listing Target").description(String.format("Specify how initial listing should be handled. %s", NOTE)).allowableValues(new AllowableValue[]{INITIAL_LISTING_TARGET_WINDOW, INITIAL_LISTING_TARGET_ALL}).defaultValue(INITIAL_LISTING_TARGET_ALL.getValue()).build();
    public static final PropertyDescriptor NODE_IDENTIFIER = new PropertyDescriptor.Builder().name("et-node-identifier").displayName("Entity Tracking Node Identifier").description(String.format("The configured value will be appended to the cache key so that listing state can be tracked per NiFi node rather than cluster wide when tracking state is scoped to LOCAL. %s", NOTE)).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).defaultValue("${hostname()}").build();
    static final Supplier<Long> DEFAULT_CURRENT_TIMESTAMP_SUPPLIER = System::currentTimeMillis;
    private final Supplier<Long> currentTimestampSupplier;
    private final Serializer<String> stringSerializer;
    private final Serializer<Map<String, ListedEntity>> listedEntitiesSerializer;
    private final Deserializer<Map<String, ListedEntity>> listedEntitiesDeserializer;
    private final String componentId;
    private final ComponentLog logger;
    private Scope scope;
    private String nodeId;
    private DistributedMapCacheClient mapCacheClient;
    private static final String CACHE_KEY_PREFIX = "ListedEntities";

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.nifi.processor.util.list.ListedEntityTracker$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/nifi/processor/util/list/ListedEntityTracker$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$nifi$components$state$Scope = new int[Scope.values().length];

        static {
            try {
                $SwitchMap$org$apache$nifi$components$state$Scope[Scope.LOCAL.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$nifi$components$state$Scope[Scope.CLUSTER.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ListedEntityTracker(String str, ComponentLog componentLog) {
        this(str, componentLog, DEFAULT_CURRENT_TIMESTAMP_SUPPLIER);
    }

    ListedEntityTracker(String str, ComponentLog componentLog, Supplier<Long> supplier) {
        this.objectMapper = new ObjectMapper();
        this.stringSerializer = (str2, outputStream) -> {
            outputStream.write(str2.getBytes(StandardCharsets.UTF_8));
        };
        this.listedEntitiesSerializer = (map, outputStream2) -> {
            GZIPOutputStream gZIPOutputStream = new GZIPOutputStream(outputStream2);
            this.objectMapper.writeValue(gZIPOutputStream, map);
            gZIPOutputStream.finish();
        };
        this.listedEntitiesDeserializer = bArr -> {
            if (bArr == null || bArr.length == 0) {
                return null;
            }
            GZIPInputStream gZIPInputStream = new GZIPInputStream(new ByteArrayInputStream(bArr));
            Throwable th = null;
            try {
                try {
                    Map map2 = (Map) this.objectMapper.readValue(gZIPInputStream, new TypeReference<Map<String, ListedEntity>>() { // from class: org.apache.nifi.processor.util.list.ListedEntityTracker.1
                    });
                    if (gZIPInputStream != null) {
                        if (0 != 0) {
                            try {
                                gZIPInputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            gZIPInputStream.close();
                        }
                    }
                    return map2;
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (gZIPInputStream != null) {
                    if (th != null) {
                        try {
                            gZIPInputStream.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        gZIPInputStream.close();
                    }
                }
                throw th4;
            }
        };
        this.componentId = str;
        this.logger = componentLog;
        this.currentTimestampSupplier = supplier;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void validateProperties(ValidationContext validationContext, Collection<ValidationResult> collection, Scope scope) {
        validateRequiredProperty(validationContext, collection, TRACKING_STATE_CACHE);
        validateRequiredProperty(validationContext, collection, TRACKING_TIME_WINDOW);
        if (Scope.LOCAL.equals(scope) && StringUtils.isEmpty(validationContext.getProperty(NODE_IDENTIFIER).evaluateAttributeExpressions().getValue())) {
            collection.add(new ValidationResult.Builder().subject(NODE_IDENTIFIER.getDisplayName()).explanation(String.format("'%s' is required to use local scope with '%s' listing strategy", NODE_IDENTIFIER.getDisplayName(), AbstractListProcessor.BY_ENTITIES.getDisplayName())).build());
        }
    }

    private static void validateRequiredProperty(ValidationContext validationContext, Collection<ValidationResult> collection, PropertyDescriptor propertyDescriptor) {
        if (validationContext.getProperty(propertyDescriptor).isSet()) {
            return;
        }
        String displayName = propertyDescriptor.getDisplayName();
        collection.add(new ValidationResult.Builder().subject(displayName).explanation(String.format("'%s' is required to use '%s' listing strategy", displayName, AbstractListProcessor.BY_ENTITIES.getDisplayName())).valid(false).build());
    }

    private String getCacheKey() {
        switch (AnonymousClass2.$SwitchMap$org$apache$nifi$components$state$Scope[this.scope.ordinal()]) {
            case 1:
                return String.format("%s::%s::%s", CACHE_KEY_PREFIX, this.componentId, this.nodeId);
            case 2:
                return String.format("%s::%s", CACHE_KEY_PREFIX, this.componentId);
            default:
                throw new IllegalArgumentException("Unknown scope: " + this.scope);
        }
    }

    private void persistListedEntities(Map<String, ListedEntity> map) throws IOException {
        String cacheKey = getCacheKey();
        this.logger.debug("Persisting listed entities: {}={}", new Object[]{cacheKey, map});
        this.mapCacheClient.put(cacheKey, map, this.stringSerializer, this.listedEntitiesSerializer);
    }

    private Map<String, ListedEntity> fetchListedEntities() throws IOException {
        String cacheKey = getCacheKey();
        Map<String, ListedEntity> map = (Map) this.mapCacheClient.get(cacheKey, this.stringSerializer, this.listedEntitiesDeserializer);
        this.logger.debug("Fetched listed entities: {}={}", new Object[]{cacheKey, map});
        return map;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void clearListedEntities() throws IOException {
        this.alreadyListedEntities = null;
        if (this.mapCacheClient != null) {
            String cacheKey = getCacheKey();
            this.logger.debug("Removing listed entities from cache storage: {}", new Object[]{cacheKey});
            this.mapCacheClient.remove(cacheKey, this.stringSerializer);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void trackEntities(ProcessContext processContext, ProcessSession processSession, boolean z, Scope scope, Function<Long, Collection<T>> function, Function<T, Map<String, String>> function2) throws ProcessException {
        boolean z2 = false;
        this.mapCacheClient = processContext.getProperty(TRACKING_STATE_CACHE).asControllerService(DistributedMapCacheClient.class);
        this.scope = scope;
        if (Scope.LOCAL.equals(scope)) {
            this.nodeId = processContext.getProperty(NODE_IDENTIFIER).evaluateAttributeExpressions().getValue();
        } else {
            this.nodeId = null;
        }
        if (this.alreadyListedEntities == null || z) {
            this.logger.info(z ? "Just elected as Primary node, restoring already-listed entities." : "At the first onTrigger, restoring already-listed entities.");
            try {
                Map<String, ListedEntity> fetchListedEntities = fetchListedEntities();
                if (fetchListedEntities == null) {
                    this.alreadyListedEntities = new ConcurrentHashMap();
                    z2 = true;
                } else {
                    this.alreadyListedEntities = new ConcurrentHashMap(fetchListedEntities);
                }
            } catch (IOException e) {
                throw new ProcessException("Failed to restore already-listed entities due to " + e, e);
            }
        }
        long longValue = (z2 && INITIAL_LISTING_TARGET_ALL.getValue().equals(processContext.getProperty(INITIAL_LISTING_TARGET).getValue())) ? -1L : this.currentTimestampSupplier.get().longValue() - processContext.getProperty(TRACKING_TIME_WINDOW).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).longValue();
        Collection<T> apply = function.apply(Long.valueOf(longValue));
        if (apply.size() == 0) {
            this.logger.debug("No entity is listed. Yielding.");
            processContext.yield();
            return;
        }
        List<ListableEntity> list = (List) apply.stream().filter(listableEntity -> {
            String identifier = listableEntity.getIdentifier();
            if (listableEntity.getTimestamp() < longValue) {
                this.logger.trace("Skipped {} having older timestamp than the minTimestampToList {}.", new Object[]{identifier, Long.valueOf(listableEntity.getTimestamp()), Long.valueOf(longValue)});
                return false;
            }
            ListedEntity listedEntity = this.alreadyListedEntities.get(identifier);
            if (listedEntity == null) {
                this.logger.trace("Picked {} being newly found.", new Object[]{identifier});
                return true;
            }
            if (listableEntity.getTimestamp() > listedEntity.getTimestamp()) {
                this.logger.trace("Picked {} having newer timestamp {} than {}.", new Object[]{identifier, Long.valueOf(listableEntity.getTimestamp()), Long.valueOf(listedEntity.getTimestamp())});
                return true;
            }
            if (listableEntity.getSize() != listedEntity.getSize()) {
                this.logger.trace("Picked {} having different size {} than {}.", new Object[]{identifier, Long.valueOf(listableEntity.getSize()), Long.valueOf(listedEntity.getSize())});
                return true;
            }
            this.logger.trace("Skipped {}, not changed.", new Object[]{identifier, Long.valueOf(listableEntity.getTimestamp()), Long.valueOf(longValue)});
            return false;
        }).collect(Collectors.toList());
        List list2 = (List) this.alreadyListedEntities.entrySet().stream().filter(entry -> {
            return ((ListedEntity) entry.getValue()).getTimestamp() < longValue;
        }).map((v0) -> {
            return v0.getKey();
        }).collect(Collectors.toList());
        if (list.isEmpty() && list2.isEmpty()) {
            this.logger.debug("None of updated or old entity was found. Yielding.");
            processContext.yield();
            return;
        }
        list2.forEach(str -> {
            this.alreadyListedEntities.remove(str);
        });
        for (ListableEntity listableEntity2 : list) {
            processSession.transfer(processSession.putAllAttributes(processSession.create(), (Map) function2.apply(listableEntity2)), AbstractListProcessor.REL_SUCCESS);
            this.alreadyListedEntities.put(listableEntity2.getIdentifier(), new ListedEntity(listableEntity2.getTimestamp(), listableEntity2.getSize()));
        }
        processSession.commit();
        try {
            this.logger.debug("Removed old entities count: {}, Updated entities count: {}", new Object[]{Integer.valueOf(list2.size()), Integer.valueOf(list.size())});
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Removed old entities: {}, Updated entities: {}", new Object[]{list2, list});
            }
            persistListedEntities(this.alreadyListedEntities);
        } catch (IOException e2) {
            throw new ProcessException("Failed to persist already-listed entities due to " + e2, e2);
        }
    }
}
