/*
 * Decompiled with CFR 0.152.
 */
package org.zalando.fahrschein.example;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.module.paramnames.ParameterNamesModule;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.sql.DataSource;
import okhttp3.CertificatePinner;
import okhttp3.ConnectionPool;
import okhttp3.OkHttpClient;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.config.ConnectionConfig;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.client.ClientHttpRequestFactory;
import org.springframework.http.client.OkHttp3ClientHttpRequestFactory;
import org.zalando.fahrschein.AccessTokenProvider;
import org.zalando.fahrschein.BackoffStrategy;
import org.zalando.fahrschein.CursorManager;
import org.zalando.fahrschein.EventProcessingException;
import org.zalando.fahrschein.ExponentialBackoffStrategy;
import org.zalando.fahrschein.IORunnable;
import org.zalando.fahrschein.Listener;
import org.zalando.fahrschein.NakadiClient;
import org.zalando.fahrschein.NoBackoffStrategy;
import org.zalando.fahrschein.StreamParameters;
import org.zalando.fahrschein.ZignAccessTokenProvider;
import org.zalando.fahrschein.domain.Cursor;
import org.zalando.fahrschein.domain.Lock;
import org.zalando.fahrschein.domain.Subscription;
import org.zalando.fahrschein.example.domain.OrderEvent;
import org.zalando.fahrschein.example.domain.OrderEventProcessor;
import org.zalando.fahrschein.example.domain.SalesOrder;
import org.zalando.fahrschein.example.domain.SalesOrderPlaced;
import org.zalando.fahrschein.http.apache.HttpComponentsRequestFactory;
import org.zalando.fahrschein.http.api.RequestFactory;
import org.zalando.fahrschein.http.spring.SpringRequestFactory;
import org.zalando.fahrschein.inmemory.InMemoryCursorManager;
import org.zalando.fahrschein.jdbc.JdbcCursorManager;
import org.zalando.fahrschein.jdbc.JdbcPartitionManager;
import org.zalando.jackson.datatype.money.MoneyModule;

public class Main {
    private static final Logger LOG = LoggerFactory.getLogger(Main.class);
    private static final String SALES_ORDER_SERVICE_ORDER_PLACED = "sales-order-service.order-placed";
    private static final URI NAKADI_URI = URI.create("https://nakadi-staging.aruha-test.zalan.do");
    private static final String JDBC_URL = "jdbc:postgresql://localhost:5432/local_nakadi_cursor_db";
    private static final String JDBC_USERNAME = "postgres";
    private static final String JDBC_PASSWORD = "postgres";
    public static final String ORDER_CREATED = "eventlog.e96001_order_created";
    public static final String ORDER_PAYMENT_STATUS_ACCEPTED = "eventlog.e62001_order_payment_status_accepted";

    public static void main(String[] args) throws IOException, InterruptedException {
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE);
        objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
        objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
        objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
        objectMapper.disable(new MapperFeature[]{MapperFeature.DEFAULT_VIEW_INCLUSION});
        objectMapper.registerModule((Module)new JavaTimeModule());
        objectMapper.registerModule((Module)new Jdk8Module());
        objectMapper.registerModule((Module)new MoneyModule());
        objectMapper.registerModule((Module)new ParameterNamesModule());
        Listener listener = events -> {
            if (Math.random() < 1.0E-7) {
                throw new EventProcessingException("Random failure");
            }
            for (SalesOrderPlaced salesOrderPlaced : events) {
                SalesOrder order = salesOrderPlaced.getSalesOrder();
                LOG.info("Received sales order [{}] created at [{}]", (Object)order.getOrderNumber(), (Object)order.getCreatedAt());
            }
        };
        Main.subscriptionListenHttpComponents(objectMapper, (Listener<SalesOrderPlaced>)listener);
    }

    private static void subscriptionMultipleEvents(ObjectMapper objectMapper) throws IOException {
        OrderEventProcessor processor = new OrderEventProcessor();
        Listener listener = events -> {
            for (OrderEvent event : events) {
                event.process(processor);
            }
        };
        NakadiClient nakadiClient = NakadiClient.builder((URI)NAKADI_URI).withAccessTokenProvider((AccessTokenProvider)new ZignAccessTokenProvider()).build();
        HashSet<String> events2 = new HashSet<String>(Arrays.asList(ORDER_CREATED, ORDER_PAYMENT_STATUS_ACCEPTED));
        Subscription subscription = nakadiClient.subscription("fahrschein-demo", events2).withConsumerGroup("fahrschein-demo").readFromEnd().subscribe();
        nakadiClient.stream(subscription).withObjectMapper(objectMapper).listen(OrderEvent.class, listener);
    }

    private static void subscriptionListenWithPositionCursors(ObjectMapper objectMapper, Listener<SalesOrderPlaced> listener) throws IOException {
        List<Cursor> cursors = Arrays.asList(new Cursor("0", "000000000000109993", SALES_ORDER_SERVICE_ORDER_PLACED), new Cursor("1", "000000000000110085", SALES_ORDER_SERVICE_ORDER_PLACED), new Cursor("2", "000000000000109128", SALES_ORDER_SERVICE_ORDER_PLACED), new Cursor("3", "000000000000110205", SALES_ORDER_SERVICE_ORDER_PLACED), new Cursor("4", "000000000000109161", SALES_ORDER_SERVICE_ORDER_PLACED), new Cursor("5", "000000000000109087", SALES_ORDER_SERVICE_ORDER_PLACED), new Cursor("6", "000000000000109100", SALES_ORDER_SERVICE_ORDER_PLACED), new Cursor("7", "000000000000109146", SALES_ORDER_SERVICE_ORDER_PLACED));
        NakadiClient nakadiClient = NakadiClient.builder((URI)NAKADI_URI).withAccessTokenProvider((AccessTokenProvider)new ZignAccessTokenProvider()).build();
        Subscription subscription = nakadiClient.subscription("fahrschein-demo", SALES_ORDER_SERVICE_ORDER_PLACED).withConsumerGroup("fahrschein-demo").readFromCursors(cursors).subscribe();
        nakadiClient.stream(subscription).withObjectMapper(objectMapper).listen(SalesOrderPlaced.class, listener);
    }

    private static void subscriptionListen(ObjectMapper objectMapper, Listener<SalesOrderPlaced> listener) throws IOException {
        NakadiClient nakadiClient = NakadiClient.builder((URI)NAKADI_URI).withAccessTokenProvider((AccessTokenProvider)new ZignAccessTokenProvider()).build();
        Subscription subscription = nakadiClient.subscription("fahrschein-demo", SALES_ORDER_SERVICE_ORDER_PLACED).withConsumerGroup("fahrschein-demo").readFromEnd().subscribe();
        nakadiClient.stream(subscription).withObjectMapper(objectMapper).listen(SalesOrderPlaced.class, listener);
    }

    private static void subscriptionListenHttpComponents(ObjectMapper objectMapper, Listener<SalesOrderPlaced> listener) throws IOException {
        RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(60000).setConnectTimeout(2000).setConnectionRequestTimeout(8000).setContentCompressionEnabled(false).build();
        ConnectionConfig connectionConfig = ConnectionConfig.custom().setBufferSize(512).build();
        CloseableHttpClient httpClient = HttpClients.custom().setDefaultRequestConfig(requestConfig).setDefaultConnectionConfig(connectionConfig).setConnectionTimeToLive(30L, TimeUnit.SECONDS).disableAutomaticRetries().disableRedirectHandling().setMaxConnTotal(8).setMaxConnPerRoute(2).build();
        HttpComponentsRequestFactory requestFactory = new HttpComponentsRequestFactory((HttpClient)httpClient);
        NakadiClient nakadiClient = NakadiClient.builder((URI)NAKADI_URI).withRequestFactory((RequestFactory)requestFactory).withAccessTokenProvider((AccessTokenProvider)new ZignAccessTokenProvider()).build();
        Subscription subscription = nakadiClient.subscription("fahrschein-demo", SALES_ORDER_SERVICE_ORDER_PLACED).withConsumerGroup("fahrschein-demo").readFromEnd().subscribe();
        nakadiClient.stream(subscription).withObjectMapper(objectMapper).listen(SalesOrderPlaced.class, listener);
    }

    private static void subscriptionListenSpringAdapter(ObjectMapper objectMapper, Listener<SalesOrderPlaced> listener) throws IOException {
        OkHttpClient client = new OkHttpClient.Builder().readTimeout(60L, TimeUnit.SECONDS).connectTimeout(2L, TimeUnit.SECONDS).writeTimeout(10L, TimeUnit.SECONDS).connectionPool(new ConnectionPool(2, 300L, TimeUnit.SECONDS)).certificatePinner(new CertificatePinner.Builder().add(NAKADI_URI.getHost(), new String[]{"sha256/KMUmME9xy7BKVUZ80VcmQ75zIZo16IZRTqVRYHVZeWY="}).build()).build();
        OkHttp3ClientHttpRequestFactory clientHttpRequestFactory = new OkHttp3ClientHttpRequestFactory(client);
        SpringRequestFactory requestFactory = new SpringRequestFactory((ClientHttpRequestFactory)clientHttpRequestFactory);
        NakadiClient nakadiClient = NakadiClient.builder((URI)NAKADI_URI).withRequestFactory((RequestFactory)requestFactory).withAccessTokenProvider((AccessTokenProvider)new ZignAccessTokenProvider()).build();
        Subscription subscription = nakadiClient.subscription("fahrschein-demo", SALES_ORDER_SERVICE_ORDER_PLACED).withConsumerGroup("fahrschein-demo").readFromEnd().subscribe();
        nakadiClient.stream(subscription).withObjectMapper(objectMapper).listen(SalesOrderPlaced.class, listener);
    }

    private static void simpleListen(ObjectMapper objectMapper, Listener<SalesOrderPlaced> listener) throws IOException {
        InMemoryCursorManager cursorManager = new InMemoryCursorManager();
        NakadiClient nakadiClient = NakadiClient.builder((URI)NAKADI_URI).withAccessTokenProvider((AccessTokenProvider)new ZignAccessTokenProvider()).withCursorManager((CursorManager)cursorManager).build();
        List partitions = nakadiClient.getPartitions(SALES_ORDER_SERVICE_ORDER_PLACED);
        nakadiClient.stream(SALES_ORDER_SERVICE_ORDER_PLACED).readFromBegin(partitions).withObjectMapper(objectMapper).withBackoffStrategy((BackoffStrategy)new ExponentialBackoffStrategy().withMaxRetries(10)).listen(SalesOrderPlaced.class, listener);
    }

    private static void persistentListen(ObjectMapper objectMapper, Listener<SalesOrderPlaced> listener) throws IOException {
        HikariConfig hikariConfig = new HikariConfig();
        hikariConfig.setJdbcUrl(JDBC_URL);
        hikariConfig.setUsername("postgres");
        hikariConfig.setPassword("postgres");
        HikariDataSource dataSource = new HikariDataSource(hikariConfig);
        JdbcCursorManager cursorManager = new JdbcCursorManager((DataSource)dataSource, "fahrschein-demo");
        NakadiClient nakadiClient = NakadiClient.builder((URI)NAKADI_URI).withAccessTokenProvider((AccessTokenProvider)new ZignAccessTokenProvider()).withCursorManager((CursorManager)cursorManager).build();
        List partitions = nakadiClient.getPartitions(SALES_ORDER_SERVICE_ORDER_PLACED);
        nakadiClient.stream(SALES_ORDER_SERVICE_ORDER_PLACED).readFromBegin(partitions).withObjectMapper(objectMapper).listen(SalesOrderPlaced.class, listener);
    }

    private static void multiInstanceListen(ObjectMapper objectMapper, Listener<SalesOrderPlaced> listener) throws IOException {
        HikariConfig hikariConfig = new HikariConfig();
        hikariConfig.setJdbcUrl(JDBC_URL);
        hikariConfig.setUsername("postgres");
        hikariConfig.setPassword("postgres");
        HikariDataSource dataSource = new HikariDataSource(hikariConfig);
        ZignAccessTokenProvider accessTokenProvider = new ZignAccessTokenProvider();
        AtomicInteger name = new AtomicInteger();
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(16);
        for (int i = 0; i < 12; ++i) {
            String instanceName = "consumer-" + name.getAndIncrement();
            JdbcPartitionManager partitionManager = new JdbcPartitionManager((DataSource)dataSource, "fahrschein-demo");
            JdbcCursorManager cursorManager = new JdbcCursorManager((DataSource)dataSource, "fahrschein-demo");
            NakadiClient nakadiClient = NakadiClient.builder((URI)NAKADI_URI).withAccessTokenProvider((AccessTokenProvider)accessTokenProvider).withCursorManager((CursorManager)cursorManager).build();
            List partitions = nakadiClient.getPartitions(SALES_ORDER_SERVICE_ORDER_PLACED);
            IORunnable instance = () -> {
                IORunnable runnable = () -> {
                    Optional optionalLock = partitionManager.lockPartitions(SALES_ORDER_SERVICE_ORDER_PLACED, partitions, instanceName);
                    if (optionalLock.isPresent()) {
                        Lock lock = (Lock)optionalLock.get();
                        try {
                            nakadiClient.stream(SALES_ORDER_SERVICE_ORDER_PLACED).withLock(lock).withObjectMapper(objectMapper).withStreamParameters(new StreamParameters().withStreamLimit(10)).withBackoffStrategy((BackoffStrategy)new NoBackoffStrategy()).listen(SalesOrderPlaced.class, listener);
                        }
                        finally {
                            partitionManager.unlockPartitions(lock);
                        }
                    }
                };
                scheduledExecutorService.scheduleWithFixedDelay(runnable.unchecked(), 0L, 1L, TimeUnit.SECONDS);
            };
            scheduledExecutorService.submit(instance.unchecked());
        }
        try {
            Thread.sleep(60000L);
            scheduledExecutorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

