package org.apache.nifi.processor.util.list;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.attribute.expression.language.evaluation.functions.PaddingEvaluator;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
import org.apache.nifi.distributed.cache.client.exception.SerializationException;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.list.ListableEntity;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.StringUtils;

@TriggerSerially
@Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER}, description = "After a listing of resources is performed, the latest timestamp of any of the resources is stored in the component's state. The scope used depends on the implementation.")
/* loaded from: input_file:WEB-INF/lib/nifi-processor-utils-1.13.0.jar:org/apache/nifi/processor/util/list/AbstractListProcessor.class */
public abstract class AbstractListProcessor<T extends ListableEntity> extends AbstractProcessor {
    public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder().name("Distributed Cache Service").description("NOTE: This property is used merely for migration from old NiFi version before state management was introduced at version 0.5.0. The stored value in the cache service will be migrated into the state when this processor is started at the first time. The specified Controller Service was used to maintain state about what had been pulled from the remote server so that if a new node begins pulling data, it won't duplicate all of the work that has been done. If not specified, the information was not shared across the cluster. This property did not need to be set for standalone instances of NiFi but was supposed to be configured if NiFi had been running within a cluster.").required(false).identifiesControllerService(DistributedMapCacheClient.class).build();
    public static final AllowableValue PRECISION_AUTO_DETECT = new AllowableValue("auto-detect", "Auto Detect", "Automatically detect time unit deterministically based on candidate entries timestamp. Please note that this option may take longer to list entities unnecessarily, if none of entries has a precise precision timestamp. E.g. even if a target system supports millis, if all entries only have timestamps without millis, such as '2017-06-16 09:06:34.000', then its precision is determined as 'seconds'.");
    public static final AllowableValue PRECISION_MILLIS = new AllowableValue("millis", "Milliseconds", "This option provides the minimum latency for an entry from being available to being listed if target system supports millis, if not, use other options.");
    public static final AllowableValue PRECISION_SECONDS = new AllowableValue("seconds", "Seconds", "For a target system that does not have millis precision, but has in seconds.");
    public static final AllowableValue PRECISION_MINUTES = new AllowableValue("minutes", "Minutes", "For a target system that only supports precision in minutes.");
    public static final PropertyDescriptor TARGET_SYSTEM_TIMESTAMP_PRECISION = new PropertyDescriptor.Builder().name("target-system-timestamp-precision").displayName("Target System Timestamp Precision").description("Specify timestamp precision at the target system. Since this processor uses timestamp of entities to decide which should be listed, it is crucial to use the right timestamp precision.").required(true).allowableValues(new AllowableValue[]{PRECISION_AUTO_DETECT, PRECISION_MILLIS, PRECISION_SECONDS, PRECISION_MINUTES}).defaultValue(PRECISION_AUTO_DETECT.getValue()).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are received are routed to success").build();
    public static final AllowableValue BY_TIMESTAMPS = new AllowableValue("timestamps", "Tracking Timestamps", "This strategy tracks the latest timestamp of listed entity to determine new/updated entities. Since it only tracks few timestamps, it can manage listing state efficiently. However, any newly added, or updated entity having timestamp older than the tracked latest timestamp can not be picked by this strategy. For example, such situation can happen in a file system if a file with old timestamp is copied or moved into the target directory without its last modified timestamp being updated. Also may miss files when multiple subdirectories are being written at the same time while listing is running.");
    public static final AllowableValue BY_ENTITIES = new AllowableValue("entities", "Tracking Entities", "This strategy tracks information of all the listed entities within the latest 'Entity Tracking Time Window' to determine new/updated entities. This strategy can pick entities having old timestamp that can be missed with 'Tracing Timestamps'. Works even when multiple subdirectories are being written at the same time while listing is running. However additional DistributedMapCache controller service is required and more JVM heap memory is used. See the description of 'Entity Tracking Time Window' property for further details on how it works.");
    public static final AllowableValue NO_TRACKING = new AllowableValue("none", "No Tracking", "This strategy lists an entity without any tracking. The same entity will be listed each time on executing this processor. It is recommended to change the default run schedule value. Any property that related to the persisting state will be disregarded.");
    public static final AllowableValue BY_TIME_WINDOW = new AllowableValue("time-window", "Time Window", "This strategy uses a sliding time window. The window starts where the previous window ended and ends with the 'current time'. One cycle will list files with modification time falling within the time window. Works even when multiple subdirectories are being written at the same time while listing is running. IMPORTANT: This strategy works properly only if the time on both the system hosting NiFi and the one hosting the files are accurate.");
    public static final PropertyDescriptor LISTING_STRATEGY = new PropertyDescriptor.Builder().name("listing-strategy").displayName("Listing Strategy").description("Specify how to determine new/updated entities. See each strategy descriptions for detail.").required(true).allowableValues(new AllowableValue[]{BY_TIMESTAMPS, BY_ENTITIES, NO_TRACKING}).defaultValue(BY_TIMESTAMPS.getValue()).build();
    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder().name("record-writer").displayName("Record Writer").description("Specifies the Record Writer to use for creating the listing. If not specified, one FlowFile will be created for each entity that is listed. If the Record Writer is specified, all entities will be written to a single FlowFile instead of adding attributes to individual FlowFiles.").required(false).identifiesControllerService(RecordSetWriterFactory.class).build();
    private volatile Long lastListedLatestEntryTimestampMillis = null;
    private volatile Long lastProcessedLatestEntryTimestampMillis = 0L;
    private volatile Long lastRunTimeNanos = 0L;
    private volatile boolean justElectedPrimaryNode = false;
    private volatile boolean resetState = false;
    private volatile boolean resetEntityTrackingState = false;
    private volatile List<String> latestIdentifiersProcessed = new ArrayList();
    private volatile ListedEntityTracker<T> listedEntityTracker;
    public static final Map<TimeUnit, Long> LISTING_LAG_MILLIS;
    static final String LATEST_LISTED_ENTRY_TIMESTAMP_KEY = "listing.timestamp";
    static final String LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY = "processed.timestamp";
    static final String IDENTIFIER_PREFIX = "id";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/nifi-processor-utils-1.13.0.jar:org/apache/nifi/processor/util/list/AbstractListProcessor$StringSerDe.class */
    public static class StringSerDe implements Serializer<String>, Deserializer<String> {
        private StringSerDe() {
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public String m701deserialize(byte[] bArr) throws DeserializationException, IOException {
            if (bArr == null) {
                return null;
            }
            return new String(bArr, StandardCharsets.UTF_8);
        }

        public void serialize(String str, OutputStream outputStream) throws SerializationException, IOException {
            outputStream.write(str.getBytes(StandardCharsets.UTF_8));
        }
    }

    public File getPersistenceFile() {
        return new File("conf/state/" + getIdentifier());
    }

    public void onPropertyModified(PropertyDescriptor propertyDescriptor, String str, String str2) {
        if (isConfigurationRestored() && isListingResetNecessary(propertyDescriptor)) {
            resetTimeStates();
            this.resetState = true;
            this.resetEntityTrackingState = true;
        }
    }

    public Set<Relationship> getRelationships() {
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        return hashSet;
    }

    protected final Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        ArrayList arrayList = new ArrayList();
        if (BY_ENTITIES.equals(validationContext.getProperty(LISTING_STRATEGY).getValue())) {
            ListedEntityTracker.validateProperties(validationContext, arrayList, getStateScope(validationContext));
        }
        customValidate(validationContext, arrayList);
        return arrayList;
    }

    protected void customValidate(ValidationContext validationContext, Collection<ValidationResult> collection) {
    }

    @OnPrimaryNodeStateChange
    public void onPrimaryNodeChange(PrimaryNodeState primaryNodeState) {
        this.justElectedPrimaryNode = primaryNodeState == PrimaryNodeState.ELECTED_PRIMARY_NODE;
    }

    @OnScheduled
    public final void updateState(ProcessContext processContext) throws IOException {
        String path = getPath(processContext);
        DistributedMapCacheClient asControllerService = processContext.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
        StateMap state = processContext.getStateManager().getState(getStateScope(processContext));
        if (state.getVersion() == -1) {
            try {
                migrateState(path, asControllerService, processContext.getStateManager(), getStateScope(processContext));
            } catch (IOException e) {
                throw new IOException("Failed to properly migrate state to State Manager", e);
            }
        }
        if (this.lastListedLatestEntryTimestampMillis != null && state.get(LATEST_LISTED_ENTRY_TIMESTAMP_KEY) == null) {
            getLogger().info("Detected that state was cleared for this component.  Resetting internal values.");
            resetTimeStates();
        }
        if (this.resetState) {
            processContext.getStateManager().clear(getStateScope(processContext));
            this.resetState = false;
        }
    }

    private void migrateState(String str, DistributedMapCacheClient distributedMapCacheClient, StateManager stateManager, Scope scope) throws IOException {
        Long l = null;
        if (distributedMapCacheClient != null) {
            StringSerDe stringSerDe = new StringSerDe();
            String str2 = (String) distributedMapCacheClient.get(getKey(str), stringSerDe, stringSerDe);
            if (str2 != null && !str2.isEmpty()) {
                l = Long.valueOf(deserialize(str2).getLatestTimestamp().getTime());
            }
            if (distributedMapCacheClient != null) {
                try {
                    distributedMapCacheClient.remove(str, new StringSerDe());
                } catch (IOException e) {
                    getLogger().warn("Failed to remove entry from Distributed Cache Service. However, the state has already been migrated to use the new State Management service, so the Distributed Cache Service is no longer needed.");
                }
            }
        }
        File persistenceFile = getPersistenceFile();
        if (persistenceFile.exists()) {
            Properties properties = new Properties();
            FileInputStream fileInputStream = new FileInputStream(persistenceFile);
            Throwable th = null;
            try {
                try {
                    properties.load(fileInputStream);
                    if (fileInputStream != null) {
                        if (0 != 0) {
                            try {
                                fileInputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            fileInputStream.close();
                        }
                    }
                    String property = properties.getProperty(str);
                    if (property != null) {
                        EntityListing deserialize = deserialize(property);
                        long time = deserialize.getLatestTimestamp().getTime();
                        if (l == null || time > l.longValue()) {
                            l = Long.valueOf(time);
                            this.latestIdentifiersProcessed.clear();
                            this.latestIdentifiersProcessed.addAll(deserialize.getMatchingIdentifiers());
                        }
                    }
                    if (persistenceFile.exists() && !persistenceFile.delete()) {
                        getLogger().warn("Migrated state but failed to delete local persistence file");
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (fileInputStream != null) {
                    if (th != null) {
                        try {
                            fileInputStream.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        fileInputStream.close();
                    }
                }
                throw th4;
            }
        }
        if (l != null) {
            stateManager.setState(createStateMap(l.longValue(), l.longValue(), this.latestIdentifiersProcessed), scope);
        }
    }

    private Map<String, String> createStateMap(long j, long j2, List<String> list) throws IOException {
        HashMap hashMap = new HashMap(list.size() + 2);
        hashMap.put(LATEST_LISTED_ENTRY_TIMESTAMP_KEY, String.valueOf(j));
        hashMap.put(LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, String.valueOf(j2));
        for (int i = 0; i < list.size(); i++) {
            hashMap.put("id." + i, list.get(i));
        }
        return hashMap;
    }

    private void persist(long j, long j2, List<String> list, ProcessSession processSession, Scope scope) throws IOException {
        processSession.setState(createStateMap(j, j2, list), scope);
    }

    protected String getKey(String str) {
        return getIdentifier() + ".lastListingTime." + str;
    }

    private EntityListing deserialize(String str) throws JsonParseException, JsonMappingException, IOException {
        return (EntityListing) new ObjectMapper().readValue(str, EntityListing.class);
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        String value = processContext.getProperty(LISTING_STRATEGY).getValue();
        if (BY_TIMESTAMPS.equals(value)) {
            listByTrackingTimestamps(processContext, processSession);
            return;
        }
        if (BY_ENTITIES.equals(value)) {
            listByTrackingEntities(processContext, processSession);
        } else if (NO_TRACKING.equals(value)) {
            listByNoTracking(processContext, processSession);
        } else {
            if (!BY_TIME_WINDOW.equals(value)) {
                throw new ProcessException("Unknown listing strategy: " + value);
            }
            listByTimeWindow(processContext, processSession);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void listByNoTracking(ProcessContext processContext, ProcessSession processSession) {
        try {
            processContext.getStateManager().clear(getStateScope(processContext));
            try {
                List<ListableEntity> performListing = performListing(processContext, 0L);
                if (performListing == null || performListing.isEmpty()) {
                    processContext.yield();
                    return;
                }
                TreeMap treeMap = new TreeMap();
                for (ListableEntity listableEntity : performListing) {
                    ((List) treeMap.computeIfAbsent(Long.valueOf(listableEntity.getTimestamp()), l -> {
                        return new ArrayList();
                    })).add(listableEntity);
                }
                if (treeMap.size() > 0) {
                    Iterator it = treeMap.entrySet().iterator();
                    while (it.hasNext()) {
                        Iterator it2 = ((List) ((Map.Entry) it.next()).getValue()).iterator();
                        while (it2.hasNext()) {
                            processSession.transfer(processSession.putAllAttributes(processSession.create(), createAttributes((ListableEntity) it2.next(), processContext)), REL_SUCCESS);
                        }
                    }
                }
            } catch (IOException e) {
                getLogger().error("Failed to perform listing on remote host due to {}", new Object[]{e.getMessage()}, e);
                processContext.yield();
            }
        } catch (IOException e2) {
            getLogger().error("Failed to remove previous state from the State Manager.", new Object[]{e2.getMessage()}, e2);
            processContext.yield();
        }
    }

    public void listByTimeWindow(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        if (this.lastListedLatestEntryTimestampMillis == null || this.justElectedPrimaryNode) {
            try {
                Optional.ofNullable(processContext.getStateManager().getState(getStateScope(processContext)).get(LATEST_LISTED_ENTRY_TIMESTAMP_KEY)).map(Long::parseLong).ifPresent(l -> {
                    this.lastListedLatestEntryTimestampMillis = l;
                });
                this.justElectedPrimaryNode = false;
            } catch (IOException e) {
                getLogger().error("Failed to retrieve timestamp of last listing from the State Manager. Will not perform listing until this is accomplished.");
                processContext.yield();
                return;
            }
        }
        long longValue = ((Long) Optional.ofNullable(this.lastListedLatestEntryTimestampMillis).orElse(0L)).longValue();
        long currentTime = getCurrentTime();
        TreeMap treeMap = new TreeMap();
        try {
            List<T> performListing = performListing(processContext, Long.valueOf(longValue));
            boolean z = false;
            boolean z2 = false;
            Iterator<T> it = performListing.iterator();
            while (it.hasNext()) {
                long timestamp = it.next().getTimestamp();
                if (!z) {
                    z = timestamp % 1000 > 0;
                }
                if (!z2) {
                    z2 = timestamp % DateUtils.MILLIS_PER_MINUTE > 0;
                }
            }
            String value = processContext.getProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION).getValue();
            if (StringUtils.isBlank(value)) {
                value = getDefaultTimePrecision();
            }
            long longValue2 = currentTime - LISTING_LAG_MILLIS.get(PRECISION_AUTO_DETECT.getValue().equals(value) ? z ? TimeUnit.MILLISECONDS : z2 ? TimeUnit.SECONDS : TimeUnit.MINUTES : PRECISION_MILLIS.getValue().equals(value) ? TimeUnit.MILLISECONDS : PRECISION_SECONDS.getValue().equals(value) ? TimeUnit.SECONDS : TimeUnit.MINUTES).longValue();
            if (getLogger().isTraceEnabled()) {
                getLogger().trace("interval: " + longValue + " - " + longValue2);
                getLogger().trace("entityList: " + ((String) performListing.stream().map(listableEntity -> {
                    return listableEntity.getName() + PaddingEvaluator.DEFAULT_PADDING_STRING + listableEntity.getTimestamp();
                }).collect(Collectors.joining(", "))));
            }
            performListing.stream().filter(listableEntity2 -> {
                return listableEntity2.getTimestamp() >= longValue;
            }).filter(listableEntity3 -> {
                return listableEntity3.getTimestamp() < longValue2;
            }).forEach(listableEntity4 -> {
                ((List) treeMap.computeIfAbsent(Long.valueOf(listableEntity4.getTimestamp()), l2 -> {
                    return new ArrayList();
                })).add(listableEntity4);
            });
            if (getLogger().isTraceEnabled()) {
                getLogger().trace("orderedEntries: " + ((String) treeMap.values().stream().flatMap((v0) -> {
                    return v0.stream();
                }).map(listableEntity5 -> {
                    return listableEntity5.getName() + PaddingEvaluator.DEFAULT_PADDING_STRING + listableEntity5.getTimestamp();
                }).collect(Collectors.joining(", "))));
            }
            if (treeMap.isEmpty()) {
                getLogger().debug("There is no data to list. Yielding.");
                processContext.yield();
                return;
            }
            if (processContext.getProperty(RECORD_WRITER).isSet()) {
                try {
                    createRecordsForEntities(processContext, processSession, treeMap);
                } catch (IOException | SchemaNotFoundException e2) {
                    getLogger().error("Failed to write listing to FlowFile", e2);
                    processContext.yield();
                    return;
                }
            } else {
                createFlowFilesForEntities(processContext, processSession, treeMap);
            }
            try {
                if (getLogger().isTraceEnabled()) {
                    getLogger().info("this.lastListedLatestEntryTimestampMillis = upperBoundExclusiveTimestamp: " + this.lastListedLatestEntryTimestampMillis + " = " + longValue2);
                }
                this.lastListedLatestEntryTimestampMillis = Long.valueOf(longValue2);
                persist(longValue2, longValue2, this.latestIdentifiersProcessed, processSession, getStateScope(processContext));
            } catch (IOException e3) {
                getLogger().warn("Unable to save state due to {}. If NiFi is restarted before state is saved, or if another node begins executing this Processor, data duplication may occur.", e3);
            }
        } catch (IOException e4) {
            getLogger().error("Failed to perform listing on remote host due to {}", new Object[]{e4.getMessage()}, e4);
            processContext.yield();
        }
    }

    protected long getCurrentTime() {
        return System.currentTimeMillis();
    }

    public void listByTrackingTimestamps(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        Long l = this.lastListedLatestEntryTimestampMillis;
        if (this.lastListedLatestEntryTimestampMillis == null || this.lastProcessedLatestEntryTimestampMillis == null || this.justElectedPrimaryNode) {
            try {
                boolean z = false;
                StateMap state = processSession.getState(getStateScope(processContext));
                this.latestIdentifiersProcessed.clear();
                for (Map.Entry entry : state.toMap().entrySet()) {
                    String str = (String) entry.getKey();
                    String str2 = (String) entry.getValue();
                    if (str2 != null && !str2.isEmpty()) {
                        if (LATEST_LISTED_ENTRY_TIMESTAMP_KEY.equals(str)) {
                            l = Long.valueOf(Long.parseLong(str2));
                            if (l.equals(this.lastListedLatestEntryTimestampMillis)) {
                                z = true;
                            } else {
                                this.lastListedLatestEntryTimestampMillis = l;
                            }
                        } else if (LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY.equals(str)) {
                            this.lastProcessedLatestEntryTimestampMillis = Long.valueOf(Long.parseLong(str2));
                        } else if (str.startsWith("id")) {
                            this.latestIdentifiersProcessed.add(str2);
                        }
                    }
                }
                this.justElectedPrimaryNode = false;
                if (z) {
                    processContext.yield();
                    return;
                }
            } catch (IOException e) {
                getLogger().error("Failed to retrieve timestamp of last listing from the State Manager. Will not perform listing until this is accomplished.");
                processContext.yield();
                return;
            }
        }
        long nanoTime = System.nanoTime();
        long currentTimeMillis = System.currentTimeMillis();
        try {
            List<T> performListing = performListing(processContext, l);
            if (performListing == null || performListing.isEmpty()) {
                processContext.yield();
                return;
            }
            Long l2 = null;
            TreeMap treeMap = new TreeMap();
            boolean z2 = false;
            boolean z3 = false;
            for (T t : performListing) {
                long timestamp = t.getTimestamp();
                if (!z2) {
                    z2 = timestamp % 1000 > 0;
                }
                if (!z3) {
                    z3 = timestamp % DateUtils.MILLIS_PER_MINUTE > 0;
                }
                if (l == null || (timestamp >= l.longValue() && timestamp >= this.lastProcessedLatestEntryTimestampMillis.longValue())) {
                    List list = (List) treeMap.get(Long.valueOf(t.getTimestamp()));
                    if (list == null) {
                        list = new ArrayList();
                        treeMap.put(Long.valueOf(t.getTimestamp()), list);
                    }
                    list.add(t);
                }
            }
            int i = 0;
            if (treeMap.size() > 0) {
                l2 = (Long) treeMap.lastKey();
                String value = processContext.getProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION).getValue();
                if (StringUtils.isBlank(value)) {
                    value = getDefaultTimePrecision();
                }
                TimeUnit timeUnit = PRECISION_AUTO_DETECT.getValue().equals(value) ? z2 ? TimeUnit.MILLISECONDS : z3 ? TimeUnit.SECONDS : TimeUnit.MINUTES : PRECISION_MILLIS.getValue().equals(value) ? TimeUnit.MILLISECONDS : PRECISION_SECONDS.getValue().equals(value) ? TimeUnit.SECONDS : TimeUnit.MINUTES;
                Long l3 = LISTING_LAG_MILLIS.get(timeUnit);
                if (l2.equals(this.lastListedLatestEntryTimestampMillis)) {
                    if (nanoTime - this.lastRunTimeNanos.longValue() < TimeUnit.MILLISECONDS.toNanos(l3.longValue()) || (l2.equals(this.lastProcessedLatestEntryTimestampMillis) && ((List) treeMap.get(l2)).stream().allMatch(listableEntity -> {
                        return this.latestIdentifiersProcessed.contains(listableEntity.getIdentifier());
                    }))) {
                        processContext.yield();
                        return;
                    }
                } else if (timeUnit.toMillis(timeUnit.convert(currentTimeMillis - l3.longValue(), TimeUnit.MILLISECONDS)) < l2.longValue()) {
                    treeMap.remove(l2);
                }
                if (processContext.getProperty(RECORD_WRITER).isSet()) {
                    try {
                        i = createRecordsForEntities(processContext, processSession, treeMap);
                    } catch (IOException | SchemaNotFoundException e2) {
                        getLogger().error("Failed to write listing to FlowFile", e2);
                        processContext.yield();
                        return;
                    }
                } else {
                    i = createFlowFilesForEntities(processContext, processSession, treeMap);
                }
            }
            if (l2 == null) {
                getLogger().debug("There is no data to list. Yielding.");
                processContext.yield();
                if (this.lastListedLatestEntryTimestampMillis == null) {
                    this.lastListedLatestEntryTimestampMillis = 0L;
                    return;
                }
                return;
            }
            boolean z4 = i > 0;
            if (!l2.equals(this.lastListedLatestEntryTimestampMillis) || z4) {
                try {
                    this.lastListedLatestEntryTimestampMillis = l2;
                    persist(l2.longValue(), this.lastProcessedLatestEntryTimestampMillis.longValue(), this.latestIdentifiersProcessed, processSession, getStateScope(processContext));
                } catch (IOException e3) {
                    getLogger().warn("Unable to save state due to {}. If NiFi is restarted before state is saved, or if another node begins executing this Processor, data duplication may occur.", e3);
                }
            }
            if (z4) {
                if (!((Long) treeMap.lastKey()).equals(this.lastProcessedLatestEntryTimestampMillis)) {
                    this.latestIdentifiersProcessed.clear();
                }
                this.latestIdentifiersProcessed.addAll((Collection) ((List) treeMap.lastEntry().getValue()).stream().map((v0) -> {
                    return v0.getIdentifier();
                }).collect(Collectors.toList()));
                this.lastProcessedLatestEntryTimestampMillis = (Long) treeMap.lastKey();
                getLogger().info("Successfully created listing with {} new objects", new Object[]{Integer.valueOf(i)});
                processSession.commit();
            }
            this.lastRunTimeNanos = Long.valueOf(nanoTime);
        } catch (IOException e4) {
            getLogger().error("Failed to perform listing on remote host due to {}", new Object[]{e4.getMessage()}, e4);
            processContext.yield();
        }
    }

    private int createRecordsForEntities(ProcessContext processContext, ProcessSession processSession, Map<Long, List<T>> map) throws IOException, SchemaNotFoundException {
        RecordSetWriterFactory asControllerService = processContext.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
        int i = 0;
        FlowFile create = processSession.create();
        OutputStream write = processSession.write(create);
        Throwable th = null;
        try {
            RecordSetWriter createWriter = asControllerService.createWriter(getLogger(), getRecordSchema(), write, Collections.emptyMap());
            Throwable th2 = null;
            try {
                createWriter.beginRecordSet();
                for (Map.Entry<Long, List<T>> entry : map.entrySet()) {
                    List<T> value = entry.getValue();
                    if (entry.getKey().equals(this.lastProcessedLatestEntryTimestampMillis)) {
                        value = (List) value.stream().filter(listableEntity -> {
                            return !this.latestIdentifiersProcessed.contains(listableEntity.getIdentifier());
                        }).collect(Collectors.toList());
                    }
                    Iterator<T> it = value.iterator();
                    while (it.hasNext()) {
                        i++;
                        createWriter.write(it.next().toRecord());
                    }
                }
                WriteResult finishRecordSet = createWriter.finishRecordSet();
                if (createWriter != null) {
                    if (0 != 0) {
                        try {
                            createWriter.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        createWriter.close();
                    }
                }
                if (i == 0) {
                    processSession.remove(create);
                    return 0;
                }
                HashMap hashMap = new HashMap(finishRecordSet.getAttributes());
                hashMap.put("record.count", String.valueOf(finishRecordSet.getRecordCount()));
                processSession.transfer(processSession.putAllAttributes(create, hashMap), REL_SUCCESS);
                return i;
            } catch (Throwable th4) {
                if (createWriter != null) {
                    if (0 != 0) {
                        try {
                            createWriter.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        createWriter.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (write != null) {
                if (0 != 0) {
                    try {
                        write.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    write.close();
                }
            }
        }
    }

    private int createFlowFilesForEntities(ProcessContext processContext, ProcessSession processSession, Map<Long, List<T>> map) {
        int i = 0;
        for (Map.Entry<Long, List<T>> entry : map.entrySet()) {
            List<T> value = entry.getValue();
            if (entry.getKey().equals(this.lastProcessedLatestEntryTimestampMillis)) {
                value = (List) value.stream().filter(listableEntity -> {
                    return !this.latestIdentifiersProcessed.contains(listableEntity.getIdentifier());
                }).collect(Collectors.toList());
            }
            Iterator<T> it = value.iterator();
            while (it.hasNext()) {
                i++;
                processSession.transfer(processSession.putAllAttributes(processSession.create(), createAttributes(it.next(), processContext)), REL_SUCCESS);
            }
        }
        return i;
    }

    protected String getDefaultTimePrecision() {
        return TARGET_SYSTEM_TIMESTAMP_PRECISION.getDefaultValue();
    }

    private void resetTimeStates() {
        this.lastListedLatestEntryTimestampMillis = null;
        this.lastProcessedLatestEntryTimestampMillis = 0L;
        this.lastRunTimeNanos = 0L;
        this.latestIdentifiersProcessed.clear();
    }

    protected abstract Map<String, String> createAttributes(T t, ProcessContext processContext);

    protected abstract String getPath(ProcessContext processContext);

    protected abstract List<T> performListing(ProcessContext processContext, Long l) throws IOException;

    protected abstract boolean isListingResetNecessary(PropertyDescriptor propertyDescriptor);

    protected abstract Scope getStateScope(PropertyContext propertyContext);

    protected abstract RecordSchema getRecordSchema();

    @OnScheduled
    public void initListedEntityTracker(ProcessContext processContext) {
        boolean equals = BY_ENTITIES.getValue().equals(processContext.getProperty(LISTING_STRATEGY).getValue());
        if (this.listedEntityTracker != null && (this.resetEntityTrackingState || !equals)) {
            try {
                this.listedEntityTracker.clearListedEntities();
            } catch (IOException e) {
                throw new RuntimeException("Failed to reset previously listed entities due to " + e, e);
            }
        }
        this.resetEntityTrackingState = false;
        if (!equals) {
            this.listedEntityTracker = null;
        } else if (this.listedEntityTracker == null) {
            this.listedEntityTracker = createListedEntityTracker();
        }
    }

    protected ListedEntityTracker<T> createListedEntityTracker() {
        return new ListedEntityTracker<>(getIdentifier(), getLogger(), getRecordSchema());
    }

    private void listByTrackingEntities(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        this.listedEntityTracker.trackEntities(processContext, processSession, this.justElectedPrimaryNode, getStateScope(processContext), l -> {
            try {
                return performListing(processContext, l);
            } catch (IOException e) {
                getLogger().error("Failed to perform listing on remote host due to {}", new Object[]{e.getMessage()}, e);
                return Collections.emptyList();
            }
        }, listableEntity -> {
            return createAttributes(listableEntity, processContext);
        });
        this.justElectedPrimaryNode = false;
    }

    static {
        HashMap hashMap = new HashMap();
        hashMap.put(TimeUnit.MILLISECONDS, 100L);
        hashMap.put(TimeUnit.SECONDS, 1000L);
        hashMap.put(TimeUnit.MINUTES, Long.valueOf(DateUtils.MILLIS_PER_MINUTE));
        LISTING_LAG_MILLIS = Collections.unmodifiableMap(hashMap);
    }
}
