/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.direct.kafka;

import cz.o2.proxima.direct.commitlog.CommitLogReader;
import cz.o2.proxima.direct.commitlog.LogObserver;
import cz.o2.proxima.direct.commitlog.ObserveHandle;
import cz.o2.proxima.direct.commitlog.Offset;
import cz.o2.proxima.direct.core.CommitCallback;
import cz.o2.proxima.direct.core.Context;
import cz.o2.proxima.direct.core.DataAccessorFactory;
import cz.o2.proxima.direct.core.DirectDataOperator;
import cz.o2.proxima.direct.core.OnlineAttributeWriter;
import cz.o2.proxima.direct.kafka.ConsumerGroup;
import cz.o2.proxima.direct.kafka.ElementSerializer;
import cz.o2.proxima.direct.kafka.KafkaAccessor;
import cz.o2.proxima.direct.kafka.KafkaConsumerFactory;
import cz.o2.proxima.direct.kafka.KafkaLogReader;
import cz.o2.proxima.direct.kafka.KafkaWriter;
import cz.o2.proxima.direct.kafka.PartitionWithTopic;
import cz.o2.proxima.internal.shaded.com.google.common.annotations.VisibleForTesting;
import cz.o2.proxima.internal.shaded.com.google.common.base.Preconditions;
import cz.o2.proxima.internal.shaded.com.google.common.collect.Lists;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecords;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.OffsetAndMetadata;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.PartitionInfo;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.TopicPartition;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.record.TimestampType;
import cz.o2.proxima.repository.AttributeFamilyDescriptor;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.storage.Partition;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.commitlog.Position;
import cz.o2.proxima.storage.internal.AbstractDataAccessorFactory;
import cz.o2.proxima.util.Classpath;
import cz.o2.proxima.util.Pair;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LocalKafkaCommitLogDescriptor
implements DataAccessorFactory {
    private static final Logger log = LoggerFactory.getLogger(LocalKafkaCommitLogDescriptor.class);
    private static final long serialVersionUID = 1L;
    public static final String CFG_NUM_PARTITIONS = "local-kafka-num-partitions";
    private static final Map<String, Map<URI, Accessor>> ACCESSORS = new ConcurrentHashMap<String, Map<URI, Accessor>>();
    private final String id = UUID.randomUUID().toString();
    private final Function<Accessor, Accessor> accessorModifier;

    public LocalKafkaCommitLogDescriptor() {
        this(Function.identity());
    }

    public LocalKafkaCommitLogDescriptor(Function<Accessor, Accessor> accessorModifier) {
        ACCESSORS.put(this.id, Collections.synchronizedMap(new HashMap()));
        this.accessorModifier = accessorModifier;
    }

    public Accessor createAccessor(DirectDataOperator direct, AttributeFamilyDescriptor family) {
        Map<URI, Accessor> accessorsForId = ACCESSORS.get(this.id);
        return accessorsForId.computeIfAbsent(family.getStorageUri(), u -> {
            Accessor newAccessor = new Accessor(family.getEntity(), (URI)u, family.getCfg(), this.id);
            return this.accessorModifier.apply(newAccessor);
        });
    }

    public AbstractDataAccessorFactory.Accept accepts(URI uri) {
        return uri.getScheme().equals("kafka-test") ? AbstractDataAccessorFactory.Accept.ACCEPT : AbstractDataAccessorFactory.Accept.REJECT;
    }

    public static class LocalKafkaWriter
    extends KafkaWriter {
        private static final long serialVersionUID = 1L;
        private final int numPartitions;
        private final String descriptorId;

        public LocalKafkaWriter(Accessor accessor, int numPartitions, String descriptorId) {
            super((KafkaAccessor)accessor);
            this.numPartitions = numPartitions;
            this.descriptorId = descriptorId;
        }

        public void write(StreamElement data, CommitCallback callback) {
            int partitionId = this.accessor.getPartitioner().getPartitionId(data);
            int partition = (partitionId & Integer.MAX_VALUE) % this.numPartitions;
            Accessor local = (Accessor)this.accessor;
            local.written.get(partition).add(data);
            long offset = local.written.get(partition).size() - 1;
            log.debug("Written data {} to LocalKafkaCommitLog descriptorId {} URI {}, partition {} at offset {}", new Object[]{data, this.descriptorId, this.getUri(), partition, offset});
            callback.commit(true, null);
        }

        public Accessor getAccessor() {
            return (Accessor)this.accessor;
        }

        public OnlineAttributeWriter.Factory<?> asFactory() {
            Accessor accessor = this.getAccessor();
            int numPartitions = this.numPartitions;
            String descriptorId = this.descriptorId;
            return (OnlineAttributeWriter.Factory & Serializable)repo -> new LocalKafkaWriter(accessor, numPartitions, descriptorId);
        }
    }

    public static class LocalKafkaLogReader
    extends KafkaLogReader {
        private static final Logger log = LoggerFactory.getLogger(LocalKafkaLogReader.class);
        private KafkaConsumer<Object, Object> consumer = null;

        public LocalKafkaLogReader(KafkaAccessor accessor, Context context) {
            super(accessor, context);
        }

        public ObserveHandle observePartitions(String name, Collection<Partition> partitions, Position position, boolean stopAtCurrent, LogObserver observer) {
            ObserveHandle ret = super.observePartitions(name, partitions, position, stopAtCurrent, observer);
            log.debug("Started to observe partitions {} of LocalKafkaCommitLog URI {}", partitions, (Object)this.getUri());
            return ret;
        }

        public ObserveHandle observe(String name, Position position, LogObserver observer) {
            ObserveHandle ret = super.observe(name, position, observer);
            log.debug("Started to observe LocalKafkaCommitLog with URI {} by consumer {}", (Object)this.getUri(), (Object)name);
            return ret;
        }

        ObserveHandle observeKafka(@Nullable String name, Collection<Partition> partitions, Position position, boolean stopAtCurrent, LogObserver observer) {
            ObserveHandle ret = super.observeKafka(name, partitions, position, stopAtCurrent, observer);
            log.debug("Started to observe partitions {} of LocalKafkaCommitLog with URI {} by consumer {}", new Object[]{partitions, this.getUri(), name});
            return ret;
        }

        public ObserveHandle observeBulk(String name, Position position, LogObserver observer) {
            ObserveHandle ret = super.observeBulk(name, position, observer);
            log.debug("Started to bulk observe LocalKafkaCommitLog with URI {} by {}", (Object)this.getUri(), (Object)name);
            return ret;
        }

        public Accessor getAccessor() {
            return (Accessor)this.accessor;
        }

        @VisibleForTesting
        KafkaConsumer<Object, Object> getConsumer() {
            return Objects.requireNonNull(this.consumer);
        }

        KafkaConsumer<Object, Object> createConsumer(String name, Collection<Offset> offsets, ConsumerRebalanceListener listener, Position position) {
            this.consumer = super.createConsumer(name, offsets, listener, position);
            return this.consumer;
        }

        public CommitLogReader.Factory<?> asFactory() {
            Accessor accessor = this.getAccessor();
            Context context = this.getContext();
            return (CommitLogReader.Factory & Serializable)repo -> new LocalKafkaLogReader(accessor, context);
        }
    }

    public static class Accessor
    extends KafkaAccessor {
        private static final long serialVersionUID = 1L;
        String descriptorId;
        int numPartitions = 1;
        transient Map<String, ConsumerGroup> consumerGroups;
        transient List<List<StreamElement>> written;
        transient Map<ConsumerId, Map<Integer, Integer>> consumerOffsets;
        transient Map<Pair<String, Integer>, AtomicInteger> committedOffsets;

        Accessor(Accessor copy, Map<String, Object> cfg) {
            super(copy.getEntityDescriptor(), copy.getUri(), cfg);
            this.descriptorId = copy.descriptorId;
            this.numPartitions = copy.numPartitions;
            this.consumerGroups = copy.consumerGroups;
            this.written = copy.written;
            this.consumerOffsets = copy.consumerOffsets;
            this.committedOffsets = copy.committedOffsets;
            this.configure(copy.getUri(), cfg);
        }

        public Accessor(EntityDescriptor entity, URI uri, Map<String, Object> cfg, String descriptorId) {
            super(entity, uri, cfg);
            this.descriptorId = descriptorId;
            this.consumerOffsets = new ConcurrentHashMap<ConsumerId, Map<Integer, Integer>>();
            this.written = Collections.synchronizedList(new ArrayList());
            this.consumerGroups = new ConcurrentHashMap<String, ConsumerGroup>();
            this.committedOffsets = Collections.synchronizedMap(new HashMap());
            this.configure(uri, cfg);
        }

        private void configure(URI uri, Map<String, Object> cfg) {
            Class cls;
            this.numPartitions = Optional.ofNullable(cfg.get(LocalKafkaCommitLogDescriptor.CFG_NUM_PARTITIONS)).filter(o -> o != null).map(o -> Integer.valueOf(o.toString())).orElse(this.numPartitions);
            for (int i = 0; i < this.numPartitions; ++i) {
                this.written.add(Collections.synchronizedList(new ArrayList()));
            }
            this.serializerClass = cls = Optional.ofNullable(cfg.get("serializer-class")).map(Object::toString).map(c -> Classpath.findClass((String)c, ElementSerializer.class)).orElse(this.serializerClass);
            log.info("Created accessor with URI {} and {} partitions", (Object)uri, (Object)this.numPartitions);
        }

        public <K, V> KafkaConsumerFactory<K, V> createConsumerFactory() {
            final ElementSerializer serializer = this.getSerializer();
            return new KafkaConsumerFactory<K, V>(this.getUri(), new Properties(), serializer.keySerde(), serializer.valueSerde()){

                public KafkaConsumer<K, V> create() {
                    return this.create(this.allPartitions());
                }

                public KafkaConsumer<K, V> create(Collection<Partition> partitions) {
                    String name = "unnamed-consumer-" + UUID.randomUUID().toString();
                    ConsumerGroup group = new ConsumerGroup(name, this.getTopic(), numPartitions, false);
                    return this.mockKafkaConsumer(name, group, serializer, partitions, null);
                }

                public KafkaConsumer<K, V> create(String name) {
                    return this.create(name, null);
                }

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public KafkaConsumer<K, V> create(String name, @Nullable ConsumerRebalanceListener listener) {
                    Class<LocalKafkaCommitLogDescriptor> clazz = LocalKafkaCommitLogDescriptor.class;
                    synchronized (LocalKafkaCommitLogDescriptor.class) {
                        ConsumerGroup group = consumerGroups.get(name);
                        if (group == null) {
                            group = new ConsumerGroup(name, this.getTopic(), numPartitions, true);
                            consumerGroups.put(name, group);
                        }
                        // ** MonitorExit[var3_3] (shouldn't be in output)
                        return this.mockKafkaConsumer(name, group, serializer, null, listener);
                    }
                }

                private List<Partition> allPartitions() {
                    ArrayList<Partition> ret = new ArrayList<Partition>();
                    int i = 0;
                    while (i < numPartitions) {
                        int id = i++;
                        ret.add((Partition)new PartitionWithTopic(this.getTopic(), id));
                    }
                    return ret;
                }
            };
        }

        <K, V> KafkaConsumer<K, V> mockKafkaConsumer(String name, ConsumerGroup group, ElementSerializer<K, V> serializer, @Nullable Collection<Partition> assignedPartitions, @Nullable ConsumerRebalanceListener listener) {
            log.info("Creating mock kafka consumer name {}, with committed offsets {}", (Object)name, this.committedOffsets);
            KafkaConsumer mock = (KafkaConsumer)Mockito.mock(KafkaConsumer.class);
            int assignedId = assignedPartitions != null ? group.add(assignedPartitions) : group.add(listener);
            AtomicBoolean polled = new AtomicBoolean();
            ConsumerId consumerId = ConsumerId.of(name, assignedId);
            this.consumerOffsets.put(consumerId, group.getAssignment(consumerId.getId()).stream().map(p -> {
                int off = this.getCommittedOffset(name, p.getId());
                off = off >= 0 ? off : this.written.get(p.getId()).size();
                return Pair.of((Object)p.getId(), (Object)off);
            }).collect(Collectors.toConcurrentMap(Pair::getFirst, Pair::getSecond)));
            ((KafkaConsumer)Mockito.doAnswer(invocation -> {
                polled.set(true);
                Duration sleep = (Duration)invocation.getArguments()[0];
                return this.pollConsumer(group, sleep.toMillis(), consumerId, serializer, listener);
            }).when((Object)mock)).poll((Duration)Mockito.any());
            ((KafkaConsumer)Mockito.doAnswer(invocation -> {
                Collection tp = (Collection)invocation.getArguments()[0];
                return this.getEndOffsets(name, tp);
            }).when((Object)mock)).endOffsets((Collection)Mockito.any());
            ((KafkaConsumer)Mockito.doAnswer(invocation -> {
                Collection c = (Collection)invocation.getArguments()[0];
                Map<TopicPartition, Long> starts = c.stream().collect(Collectors.toMap(i -> i, i -> 0L));
                log.debug("Consumer {} beginningOffsets {}: {}", new Object[]{name, c, starts});
                return starts;
            }).when((Object)mock)).beginningOffsets((Collection)Mockito.any());
            ((KafkaConsumer)Mockito.doAnswer(invocation -> {
                Collection parts = (Collection)invocation.getArguments()[0];
                this.seekConsumerToBeginning(consumerId, parts);
                polled.set(true);
                return null;
            }).when((Object)mock)).seekToBeginning((Collection)Mockito.any());
            ((KafkaConsumer)Mockito.doAnswer(invocation -> {
                TopicPartition tp = (TopicPartition)invocation.getArguments()[0];
                long offset = (Long)invocation.getArguments()[1];
                this.seekConsumerTo(consumerId, tp.partition(), offset);
                polled.set(true);
                return null;
            }).when((Object)mock)).seek((TopicPartition)Mockito.any(), Mockito.anyLong());
            ((KafkaConsumer)Mockito.doAnswer(invocation -> {
                Map commitMap = (Map)invocation.getArguments()[0];
                this.commitConsumer(name, commitMap);
                return null;
            }).when((Object)mock)).commitSync((Map)Mockito.any());
            ((KafkaConsumer)Mockito.doAnswer(invocation -> {
                Set parts = (Set)invocation.getArguments()[0];
                return parts.stream().map(tp -> {
                    int off = this.getCommittedOffset(name, tp.partition());
                    if (off >= 0) {
                        return Pair.of((Object)tp, (Object)new OffsetAndMetadata((long)off));
                    }
                    return Pair.of((Object)tp, null);
                }).filter(p -> p.getSecond() != null).collect(Collectors.toMap(Pair::getFirst, Pair::getSecond));
            }).when((Object)mock)).committed((Set)Mockito.any());
            ((KafkaConsumer)Mockito.doAnswer(invocation -> polled.get() ? group.getAssignment(consumerId.getId()).stream().map(p -> new TopicPartition(this.getTopic(), p.getId())).collect(Collectors.toSet()) : Collections.emptySet()).when((Object)mock)).assignment();
            Mockito.when((Object)mock.partitionsFor((String)Mockito.eq((Object)group.getTopic()))).thenReturn(IntStream.range(0, group.getNumPartitions()).mapToObj(i -> new PartitionInfo(group.getTopic(), i, null, null, null)).collect(Collectors.toList()));
            ((KafkaConsumer)Mockito.doAnswer(invocation -> {
                group.remove(consumerId.getId());
                return null;
            }).when((Object)mock)).close();
            ((KafkaConsumer)Mockito.doAnswer(invocation -> {
                TopicPartition tp = (TopicPartition)invocation.getArguments()[0];
                return (long)Optional.ofNullable(this.consumerOffsets.get(consumerId).get(tp.partition())).orElse(-1).intValue();
            }).when((Object)mock)).position((TopicPartition)Mockito.any());
            return mock;
        }

        private int getCommittedOffset(String name, int partition) {
            AtomicInteger committed = this.committedOffsets.get(Pair.of((Object)name, (Object)partition));
            if (committed != null) {
                return committed.get();
            }
            return -1;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void commitConsumer(String name, Map<TopicPartition, OffsetAndMetadata> commitMap) {
            Map<Pair<String, Integer>, AtomicInteger> map = this.committedOffsets;
            synchronized (map) {
                commitMap.entrySet().forEach(entry -> {
                    int partition = ((TopicPartition)entry.getKey()).partition();
                    long offset = ((OffsetAndMetadata)entry.getValue()).offset();
                    this.committedOffsets.compute((Pair<String, Integer>)Pair.of((Object)name, (Object)partition), (tmp, old) -> {
                        if (old == null) {
                            return new AtomicInteger((int)offset);
                        }
                        old.set((int)offset);
                        return old;
                    });
                });
            }
            log.debug("Consumer {} committed offsets {}, offsets now {}", new Object[]{name, commitMap, this.committedOffsets});
        }

        private void seekConsumerTo(ConsumerId consumerId, int partition, long offset) {
            Preconditions.checkArgument((offset >= 0L ? 1 : 0) != 0, (String)"Cannot seek to negative offset %s", (long)offset);
            Map<Integer, Integer> partOffsets = this.consumerOffsets.get(consumerId);
            partOffsets.put(partition, (int)offset);
            log.debug("Consumer {} seeked to offset {} in partition {}", new Object[]{consumerId, offset, partition});
        }

        private void seekConsumerToBeginning(ConsumerId consumerId, Collection<TopicPartition> parts) {
            this.consumerOffsets.compute(consumerId, (k, offsets) -> {
                parts.forEach(tp -> offsets.put(tp.partition(), 0));
                return offsets;
            });
            log.debug("Consumer {} seeked to beginning of {}", (Object)consumerId.getName(), parts);
        }

        private Map<TopicPartition, Long> getEndOffsets(String name, Collection<TopicPartition> tp) {
            HashMap<TopicPartition, Long> ends = new HashMap<TopicPartition, Long>();
            for (TopicPartition p : tp) {
                ends.put(new TopicPartition(this.getTopic(), p.partition()), Long.valueOf(this.written.get(p.partition()).size()));
            }
            log.debug("Consumer {} endOffsets {}: {}", new Object[]{name, tp, ends});
            return ends;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private <K, V> ConsumerRecords<K, V> pollConsumer(ConsumerGroup group, long period, ConsumerId consumerId, ElementSerializer<K, V> serializer, @Nullable ConsumerRebalanceListener listener) throws InterruptedException {
            ConsumerId consumerId2 = consumerId;
            synchronized (consumerId2) {
                String name = consumerId.getName();
                if (!consumerId.isAssigned()) {
                    log.debug("Initializing consumer {} after first time poll with listener {}", (Object)name, (Object)listener);
                    if (!group.rebalanceIfNeeded() && listener != null) {
                        listener.onPartitionsAssigned((Collection)group.getAssignment(consumerId.getId()).stream().map(p -> new TopicPartition(this.getTopic(), p.getId())).collect(Collectors.toList()));
                    }
                    consumerId.setAssigned(true);
                }
                log.debug("Sleeping {} ms before attempting to poll", (Object)period);
                Thread.sleep(period);
                HashMap map = new HashMap();
                ArrayList assignment = Lists.newArrayList(group.getAssignment(consumerId.getId()));
                Map<Integer, Integer> offsets = this.consumerOffsets.get(consumerId);
                if (log.isDebugEnabled()) {
                    log.debug("Polling consumerId {}.{} with assignment {} and offsets {}", new Object[]{this.descriptorId, consumerId, assignment.stream().map(Partition::getId).collect(Collectors.toList()), offsets});
                }
                int maxToPoll = this.getMaxPollRecords();
                for (Partition part : assignment) {
                    int off;
                    int partition = part.getId();
                    if (partition >= this.written.size()) continue;
                    List<StreamElement> partitionData = this.written.get(partition);
                    int last = partitionData.size();
                    ArrayList<ConsumerRecord<K, V>> records = new ArrayList<ConsumerRecord<K, V>>();
                    log.trace("Partition {} has last {}, reading from {}", new Object[]{partition, last, off});
                    for (off = Optional.ofNullable(offsets.get(partition)).orElse(this.getCommittedOffset(name, part.getId())).intValue(); off < last && maxToPoll-- > 0; ++off) {
                        if (off < 0) continue;
                        records.add(this.toConsumerRecord(partitionData.get(off), serializer, part.getId(), off));
                    }
                    offsets.put(partition, off);
                    log.trace("Advanced offset of consumer ID {} on partition {} to {}", new Object[]{consumerId, partition, off});
                    if (records.isEmpty()) continue;
                    map.put(new TopicPartition(this.getTopic(), partition), records);
                }
                log.debug("Consumer {} id {} polled records {}", new Object[]{name, consumerId, map});
                return new ConsumerRecords(map);
            }
        }

        private <K, V> ConsumerRecord<K, V> toConsumerRecord(StreamElement ingest, ElementSerializer<K, V> serializer, int partitionId, int offset) {
            Pair elem = serializer.write(ingest);
            int keyLength = serializer.keySerde().serializer().serialize(this.getTopic(), elem.getFirst()).length;
            int valueLength = ingest.isDelete() ? 0 : serializer.valueSerde().serializer().serialize(this.getTopic(), elem.getSecond()).length;
            return new ConsumerRecord(this.getTopic(), partitionId, (long)offset, ingest.getStamp(), TimestampType.CREATE_TIME, 0L, keyLength, valueLength, elem.getFirst(), elem.getSecond());
        }

        public boolean allConsumed() {
            return this.allConsumed(this.written.stream().map(List::size).collect(Collectors.toList()));
        }

        public boolean allConsumed(List<Integer> untilOffsets) {
            return this.consumerGroups.keySet().stream().map(name -> {
                int partition = 0;
                Iterator iterator = untilOffsets.iterator();
                while (iterator.hasNext()) {
                    int current;
                    int written = (Integer)iterator.next();
                    if (!this.consumerOffsets.get(ConsumerId.of(name, current = partition++)).entrySet().stream().filter(e -> (Integer)e.getKey() == current).anyMatch(e -> (Integer)e.getValue() < written)) continue;
                    return false;
                }
                return true;
            }).reduce(true, (a, b) -> a != false && b != false);
        }

        LocalKafkaWriter newWriter() {
            return new LocalKafkaWriter(this, this.numPartitions, this.descriptorId);
        }

        LocalKafkaLogReader newReader(Context context) {
            return new LocalKafkaLogReader(this, context);
        }

        private void writeObject(ObjectOutputStream oos) throws IOException {
            oos.defaultWriteObject();
        }

        private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException {
            ois.defaultReadObject();
            Accessor original = (Accessor)((Object)((Map)ACCESSORS.get(this.descriptorId)).get(this.getUri()));
            this.committedOffsets = original.committedOffsets;
            this.consumerGroups = original.consumerGroups;
            this.consumerOffsets = original.consumerOffsets;
            this.written = original.written;
        }

        public void clear() {
            this.consumerOffsets.clear();
            this.written.clear();
            this.consumerGroups.clear();
            this.committedOffsets.clear();
        }
    }

    private static class ConsumerId {
        final String name;
        final int id;
        boolean assigned = false;

        static ConsumerId of(String name, int id) {
            return new ConsumerId(name, id);
        }

        private ConsumerId(String name, int id) {
            this.name = name;
            this.id = id;
        }

        public int hashCode() {
            return Objects.hash(this.name, this.id);
        }

        public boolean equals(Object obj) {
            if (!(obj instanceof ConsumerId)) {
                return false;
            }
            ConsumerId other = (ConsumerId)obj;
            return this.id == other.id && Objects.equals(this.name, other.name);
        }

        public String toString() {
            return "ConsumerId(" + this.name + ", " + this.id + ")";
        }

        public String getName() {
            return this.name;
        }

        public int getId() {
            return this.id;
        }

        public boolean isAssigned() {
            return this.assigned;
        }

        public void setAssigned(boolean assigned) {
            this.assigned = assigned;
        }
    }
}

