package org.opentripplanner.ext.siri.updater;

import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.pubsub.v1.ExpirationPolicy;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.entur.protobuf.mapper.SiriMapper;
import org.opentripplanner.ext.siri.SiriFuzzyTripMatcher;
import org.opentripplanner.ext.siri.SiriTimetableSnapshotSource;
import org.opentripplanner.ext.vectortiles.VectorTilesResource;
import org.opentripplanner.standalone.server.EtagRequestFilter;
import org.opentripplanner.transit.service.DefaultTransitService;
import org.opentripplanner.transit.service.TransitModel;
import org.opentripplanner.updater.GraphUpdater;
import org.opentripplanner.updater.WriteToGraphCallback;
import org.opentripplanner.updater.trip.UpdateResult;
import org.opentripplanner.updater.trip.metrics.TripUpdateMetrics;
import org.opentripplanner.util.HttpUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import uk.org.siri.siri20.EstimatedTimetableDeliveryStructure;
import uk.org.siri.siri20.EstimatedVersionFrameStructure;
import uk.org.siri.siri20.Siri;
import uk.org.siri.www.siri.SiriType;

/* loaded from: input_file:org/opentripplanner/ext/siri/updater/SiriETGooglePubsubUpdater.class */
public class SiriETGooglePubsubUpdater implements GraphUpdater {
    private static final Logger LOG = LoggerFactory.getLogger(SiriETGooglePubsubUpdater.class);
    private static final AtomicLong MESSAGE_COUNTER = new AtomicLong(0);
    private static final AtomicLong UPDATE_COUNTER = new AtomicLong(0);
    private static final AtomicLong SIZE_COUNTER = new AtomicLong(0);
    private final URI dataInitializationUrl;
    private final String feedId;
    private final Duration reconnectPeriod;
    private final Duration initialGetDataTimeout;
    private final SubscriptionAdminClient subscriptionAdminClient;
    private final ProjectSubscriptionName subscriptionName;
    private final ProjectTopicName topic;
    private final PushConfig pushConfig;
    private final String configRef;
    private final SiriTimetableSnapshotSource snapshotSource;
    private final SiriFuzzyTripMatcher fuzzyTripMatcher;
    private WriteToGraphCallback saveResultOnGraph;
    private transient long startTime;
    private boolean primed;
    private final Consumer<UpdateResult> recordMetrics;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/opentripplanner/ext/siri/updater/SiriETGooglePubsubUpdater$EstimatedTimetableMessageReceiver.class */
    public class EstimatedTimetableMessageReceiver implements MessageReceiver {
        EstimatedTimetableMessageReceiver() {
        }

        public void receiveMessage(PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) {
            try {
                SiriETGooglePubsubUpdater.SIZE_COUNTER.addAndGet(pubsubMessage.getData().size());
                Siri mapToJaxb = SiriMapper.mapToJaxb(SiriType.parseFrom(pubsubMessage.getData()));
                if (mapToJaxb.getServiceDelivery() != null) {
                    List estimatedTimetableDeliveries = mapToJaxb.getServiceDelivery().getEstimatedTimetableDeliveries();
                    int i = 0;
                    try {
                        i = ((EstimatedVersionFrameStructure) ((EstimatedTimetableDeliveryStructure) estimatedTimetableDeliveries.get(0)).getEstimatedJourneyVersionFrames().get(0)).getEstimatedVehicleJourneies().size();
                    } catch (Throwable th) {
                    }
                    long addAndGet = SiriETGooglePubsubUpdater.UPDATE_COUNTER.addAndGet(i);
                    long incrementAndGet = SiriETGooglePubsubUpdater.MESSAGE_COUNTER.incrementAndGet();
                    if (incrementAndGet % 1000 == 0) {
                        SiriETGooglePubsubUpdater.LOG.info("Pubsub stats: [messages: {}, updates: {}, total size: {}, current delay {} ms, time since startup: {}]", new Object[]{Long.valueOf(incrementAndGet), Long.valueOf(addAndGet), FileUtils.byteCountToDisplaySize(SiriETGooglePubsubUpdater.SIZE_COUNTER.get()), Long.valueOf(SiriETGooglePubsubUpdater.this.now() - mapToJaxb.getServiceDelivery().getResponseTimestamp().toInstant().toEpochMilli()), SiriETGooglePubsubUpdater.this.getTimeSinceStartupString()});
                    }
                    Future<?> execute = SiriETGooglePubsubUpdater.this.saveResultOnGraph.execute((graph, transitModel) -> {
                        SiriETGooglePubsubUpdater.this.recordMetrics.accept(SiriETGooglePubsubUpdater.this.snapshotSource.applyEstimatedTimetable(transitModel, SiriETGooglePubsubUpdater.this.fuzzyTripMatcher, SiriETGooglePubsubUpdater.this.feedId, false, estimatedTimetableDeliveries));
                    });
                    if (!SiriETGooglePubsubUpdater.this.isPrimed()) {
                        try {
                            execute.get();
                        } catch (InterruptedException | ExecutionException e) {
                            throw new RuntimeException(e);
                        }
                    }
                }
                ackReplyConsumer.ack();
            } catch (InvalidProtocolBufferException e2) {
                throw new RuntimeException((Throwable) e2);
            }
        }
    }

    public SiriETGooglePubsubUpdater(SiriETGooglePubsubUpdaterParameters siriETGooglePubsubUpdaterParameters, TransitModel transitModel, SiriTimetableSnapshotSource siriTimetableSnapshotSource) {
        this.configRef = siriETGooglePubsubUpdaterParameters.configRef();
        this.dataInitializationUrl = URI.create(siriETGooglePubsubUpdaterParameters.dataInitializationUrl());
        this.feedId = siriETGooglePubsubUpdaterParameters.feedId();
        this.reconnectPeriod = siriETGooglePubsubUpdaterParameters.reconnectPeriod();
        this.initialGetDataTimeout = siriETGooglePubsubUpdaterParameters.initialGetDataTimeout();
        this.snapshotSource = siriTimetableSnapshotSource;
        String str = System.getenv("HOSTNAME");
        str = (str == null || str.isEmpty()) ? "otp-" + UUID.randomUUID().toString() : str;
        String projectName = siriETGooglePubsubUpdaterParameters.projectName();
        String str2 = siriETGooglePubsubUpdaterParameters.topicName();
        this.subscriptionName = ProjectSubscriptionName.of(projectName, str);
        this.topic = ProjectTopicName.of(projectName, str2);
        this.pushConfig = PushConfig.getDefaultInstance();
        this.fuzzyTripMatcher = siriETGooglePubsubUpdaterParameters.fuzzyTripMatching() ? SiriFuzzyTripMatcher.of(new DefaultTransitService(transitModel)) : null;
        try {
            if (System.getenv("GOOGLE_APPLICATION_CREDENTIALS") == null || System.getenv("GOOGLE_APPLICATION_CREDENTIALS").isEmpty()) {
                throw new RuntimeException("Google Pubsub updater is configured, but environment variable 'GOOGLE_APPLICATION_CREDENTIALS' is not defined. See https://cloud.google.com/docs/authentication/getting-started");
            }
            this.subscriptionAdminClient = SubscriptionAdminClient.create();
            addShutdownHook();
            this.recordMetrics = TripUpdateMetrics.streaming(siriETGooglePubsubUpdaterParameters);
        } catch (IOException e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }

    @Override // org.opentripplanner.updater.GraphUpdater
    public void setGraphUpdaterManager(WriteToGraphCallback writeToGraphCallback) {
        this.saveResultOnGraph = writeToGraphCallback;
    }

    @Override // org.opentripplanner.updater.GraphUpdater
    public void run() throws IOException {
        if (this.subscriptionAdminClient == null) {
            throw new RuntimeException("Unable to initialize Google Pubsub-updater: System.getenv('GOOGLE_APPLICATION_CREDENTIALS') = " + System.getenv("GOOGLE_APPLICATION_CREDENTIALS"));
        }
        LOG.info("Creating subscription {}", this.subscriptionName);
        Subscription createSubscription = this.subscriptionAdminClient.createSubscription(Subscription.newBuilder().setTopic(this.topic.toString()).setName(this.subscriptionName.toString()).setPushConfig(this.pushConfig).setMessageRetentionDuration(com.google.protobuf.Duration.newBuilder().setSeconds(600L).build()).setExpirationPolicy(ExpirationPolicy.newBuilder().setTtl(com.google.protobuf.Duration.newBuilder().setSeconds(86400L).build()).build()).build());
        LOG.info("Created subscription {}", this.subscriptionName);
        this.startTime = now();
        EstimatedTimetableMessageReceiver estimatedTimetableMessageReceiver = new EstimatedTimetableMessageReceiver();
        int i = 1000;
        int i2 = 1;
        while (!isPrimed()) {
            try {
                initializeData(this.dataInitializationUrl, estimatedTimetableMessageReceiver);
            } catch (Exception e) {
                i *= 2;
                int i3 = i2;
                i2++;
                LOG.warn("Caught Exception while initializing data, will retry after {} ms - attempt number {}. ({})", new Object[]{Integer.valueOf(i), Integer.valueOf(i3), e.toString()});
                try {
                    Thread.sleep(i);
                } catch (InterruptedException e2) {
                }
            }
        }
        Subscriber subscriber = null;
        while (true) {
            try {
                subscriber = Subscriber.newBuilder(createSubscription.getName(), estimatedTimetableMessageReceiver).build();
                subscriber.startAsync().awaitRunning();
                subscriber.awaitTerminated();
            } catch (IllegalStateException e3) {
                if (subscriber != null) {
                    subscriber.stopAsync();
                }
            }
            try {
                Thread.sleep(this.reconnectPeriod.toMillis());
            } catch (InterruptedException e4) {
                e4.printStackTrace();
            }
        }
    }

    @Override // org.opentripplanner.updater.GraphUpdater
    public void teardown() {
        if (this.subscriptionAdminClient != null) {
            LOG.info("Deleting subscription {}", this.subscriptionName);
            this.subscriptionAdminClient.deleteSubscription(this.subscriptionName);
            LOG.info("Subscription deleted {} - time since startup: {} sec", this.subscriptionName, Long.valueOf((now() - this.startTime) / 1000));
        }
    }

    @Override // org.opentripplanner.updater.GraphUpdater
    public boolean isPrimed() {
        return this.primed;
    }

    @Override // org.opentripplanner.updater.GraphUpdater
    public String getConfigRef() {
        return this.configRef;
    }

    private void addShutdownHook() {
        try {
            Runtime.getRuntime().addShutdownHook(new Thread(this::teardown));
            LOG.info("Shutdown-hook to clean up Google Pubsub subscription has been added.");
        } catch (IllegalStateException e) {
            LOG.info("Instance is already shutting down - cleaning up immediately.", e);
            teardown();
        }
    }

    private long now() {
        return ZonedDateTime.now().toInstant().toEpochMilli();
    }

    private String getTimeSinceStartupString() {
        return DurationFormatUtils.formatDuration(now() - this.startTime, "HH:mm:ss");
    }

    private void initializeData(URI uri, EstimatedTimetableMessageReceiver estimatedTimetableMessageReceiver) throws IOException {
        if (uri != null) {
            LOG.info("Fetching initial data from {}", uri);
            long currentTimeMillis = System.currentTimeMillis();
            ByteString readFrom = ByteString.readFrom(HttpUtils.getData(uri, this.initialGetDataTimeout, Map.of(EtagRequestFilter.HEADER_CONTENT_TYPE, VectorTilesResource.APPLICATION_X_PROTOBUF)));
            final long currentTimeMillis2 = System.currentTimeMillis();
            LOG.info("Fetching initial data - finished after {} ms, got {} bytes", Long.valueOf(currentTimeMillis2 - currentTimeMillis), FileUtils.byteCountToDisplaySize(readFrom.size()));
            estimatedTimetableMessageReceiver.receiveMessage(PubsubMessage.newBuilder().setData(readFrom).build(), new AckReplyConsumer() { // from class: org.opentripplanner.ext.siri.updater.SiriETGooglePubsubUpdater.1
                public void ack() {
                    SiriETGooglePubsubUpdater.this.primed = true;
                    SiriETGooglePubsubUpdater.LOG.info("Pubsub updater initialized after {} ms: [messages: {},  updates: {}, total size: {}, time since startup: {}]", new Object[]{Long.valueOf(System.currentTimeMillis() - currentTimeMillis2), Long.valueOf(SiriETGooglePubsubUpdater.MESSAGE_COUNTER.get()), Long.valueOf(SiriETGooglePubsubUpdater.UPDATE_COUNTER.get()), FileUtils.byteCountToDisplaySize(SiriETGooglePubsubUpdater.SIZE_COUNTER.get()), SiriETGooglePubsubUpdater.this.getTimeSinceStartupString()});
                }

                public void nack() {
                }
            });
        }
    }
}
