/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.client.stream.impl;

import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.control.impl.Controller;
import io.pravega.client.control.impl.ControllerFailureException;
import io.pravega.client.security.auth.DelegationTokenProvider;
import io.pravega.client.segment.impl.NoSuchSegmentException;
import io.pravega.client.segment.impl.Segment;
import io.pravega.client.segment.impl.SegmentOutputStream;
import io.pravega.client.segment.impl.SegmentOutputStreamFactory;
import io.pravega.client.segment.impl.SegmentSealedException;
import io.pravega.client.stream.EventWriterConfig;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.impl.PendingEvent;
import io.pravega.client.stream.impl.StreamSegments;
import io.pravega.client.stream.impl.StreamSegmentsWithPredecessors;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.hash.RandomFactory;
import io.pravega.common.util.RetriesExhaustedException;
import java.beans.ConstructorProperties;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.function.Consumer;
import javax.annotation.concurrent.GuardedBy;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SegmentSelector {
    @SuppressFBWarnings(justification="generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(SegmentSelector.class);
    @SuppressFBWarnings(justification="generated code")
    @Generated
    private final Object $lock = new Object[0];
    private final Stream stream;
    private final Controller controller;
    private final SegmentOutputStreamFactory outputStreamFactory;
    @GuardedBy(value="$lock")
    private final Random random = RandomFactory.create();
    @GuardedBy(value="$lock")
    private StreamSegments currentSegments;
    @GuardedBy(value="$lock")
    private final Map<Segment, SegmentOutputStream> writers = new HashMap<Segment, SegmentOutputStream>();
    private final EventWriterConfig config;
    private final DelegationTokenProvider tokenProvider;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public SegmentOutputStream getSegmentOutputStreamForKey(String routingKey) {
        Object object = this.$lock;
        synchronized (object) {
            if (this.currentSegments == null) {
                return null;
            }
            return this.writers.get(this.getSegmentForEvent(routingKey));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Segment getSegmentForEvent(String routingKey) {
        Object object = this.$lock;
        synchronized (object) {
            if (this.currentSegments == null) {
                return null;
            }
            if (routingKey == null) {
                return this.currentSegments.getSegmentForKey(this.random.nextDouble());
            }
            return this.currentSegments.getSegmentForKey(routingKey);
        }
    }

    public List<PendingEvent> refreshSegmentEventWritersUponSealed(Segment sealedSegment, Consumer<Segment> segmentSealedCallback) {
        StreamSegmentsWithPredecessors successors = Futures.getAndHandleExceptions(this.controller.getSuccessors(sealedSegment), t -> {
            log.error("Error while fetching successors for segment: {}", (Object)sealedSegment, t);
            RuntimeException e = t instanceof RetriesExhaustedException ? new ControllerFailureException((Throwable)t) : new NoSuchSegmentException(sealedSegment.toString(), (Throwable)t);
            this.removeAllWriters().forEach(event -> event.getAckFuture().completeExceptionally(e));
            return null;
        });
        if (successors == null) {
            return Collections.emptyList();
        }
        if (successors.getSegmentToPredecessor().isEmpty()) {
            log.warn("Stream {} is sealed since no successor segments found for segment {} ", (Object)sealedSegment.getStream(), (Object)sealedSegment);
            IllegalStateException e = new IllegalStateException("Writes cannot proceed since the stream is sealed");
            this.removeAllWriters().forEach(pendingEvent -> pendingEvent.getAckFuture().completeExceptionally(e));
            return Collections.emptyList();
        }
        return this.updateSegmentsUponSealed(successors, sealedSegment, segmentSealedCallback);
    }

    public List<PendingEvent> refreshSegmentEventWriters(Consumer<Segment> segmentSealedCallBack) {
        log.info("Refreshing segments for stream {}", (Object)this.stream);
        return this.updateSegments(Futures.getAndHandleExceptions(this.controller.getCurrentSegments(this.stream.getScope(), this.stream.getStreamName()), RuntimeException::new), segmentSealedCallBack);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void removeSegmentWriter(Segment segment) {
        Object object = this.$lock;
        synchronized (object) {
            this.writers.remove(segment);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private List<PendingEvent> updateSegments(StreamSegments newStreamSegments, Consumer<Segment> segmentSealedCallBack) {
        Object object = this.$lock;
        synchronized (object) {
            Preconditions.checkState(newStreamSegments.getNumberOfSegments() > 0, "Writers cannot proceed writing since the stream %s is sealed", (Object)this.stream);
            this.currentSegments = newStreamSegments;
            this.createMissingWriters(segmentSealedCallBack);
            ArrayList<PendingEvent> toResend = new ArrayList<PendingEvent>();
            Iterator<Map.Entry<Segment, SegmentOutputStream>> iter = this.writers.entrySet().iterator();
            while (iter.hasNext()) {
                Map.Entry<Segment, SegmentOutputStream> entry = iter.next();
                if (this.currentSegments.getSegments().contains(entry.getKey())) continue;
                SegmentOutputStream writer = entry.getValue();
                log.info("Closing writer {} on segment {} during segment refresh", (Object)writer, (Object)entry.getKey());
                iter.remove();
                try {
                    writer.close();
                }
                catch (SegmentSealedException e) {
                    log.info("Caught segment sealed while refreshing on segment {}", (Object)entry.getKey());
                }
                toResend.addAll(writer.getUnackedEventsOnSeal());
            }
            return toResend;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private List<PendingEvent> updateSegmentsUponSealed(StreamSegmentsWithPredecessors successors, Segment sealedSegment, Consumer<Segment> segmentSealedCallback) {
        Object object = this.$lock;
        synchronized (object) {
            this.currentSegments = this.currentSegments.withReplacementRange(sealedSegment, successors);
            this.createMissingWriters(segmentSealedCallback);
            log.debug("Fetch unacked events for segment: {}, and adding new segments {}", (Object)sealedSegment, (Object)this.currentSegments);
            return this.writers.get(sealedSegment).getUnackedEventsOnSeal();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private List<PendingEvent> removeAllWriters() {
        Object object = this.$lock;
        synchronized (object) {
            ArrayList<PendingEvent> pendingEvents = new ArrayList<PendingEvent>();
            this.writers.values().forEach(out -> pendingEvents.addAll(out.getUnackedEventsOnSeal()));
            this.writers.clear();
            return pendingEvents;
        }
    }

    private void createMissingWriters(Consumer<Segment> segmentSealedCallBack) {
        this.tokenProvider.populateToken(this.currentSegments.getDelegationToken());
        for (Segment segment : this.currentSegments.getSegments()) {
            if (this.writers.containsKey(segment)) continue;
            log.debug("Creating writer for segment {}", (Object)segment);
            SegmentOutputStream out = this.outputStreamFactory.createOutputStreamForSegment(segment, segmentSealedCallBack, this.config, this.tokenProvider);
            this.writers.put(segment, out);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<Segment> getSegments() {
        Object object = this.$lock;
        synchronized (object) {
            if (this.currentSegments == null) {
                return Collections.emptyList();
            }
            return new ArrayList<Segment>(this.currentSegments.getSegments());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Map<Segment, SegmentOutputStream> getWriters() {
        Object object = this.$lock;
        synchronized (object) {
            return new HashMap<Segment, SegmentOutputStream>(this.writers);
        }
    }

    @ConstructorProperties(value={"stream", "controller", "outputStreamFactory", "config", "tokenProvider"})
    @SuppressFBWarnings(justification="generated code")
    @Generated
    public SegmentSelector(Stream stream, Controller controller, SegmentOutputStreamFactory outputStreamFactory, EventWriterConfig config, DelegationTokenProvider tokenProvider) {
        this.stream = stream;
        this.controller = controller;
        this.outputStreamFactory = outputStreamFactory;
        this.config = config;
        this.tokenProvider = tokenProvider;
    }
}

