Class: VertxKafkaClient::KafkaConsumer

Inherits:
Object
  • Object
show all
Includes:
Vertx::ReadStream
Defined in:
/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb

Overview

Vert.x Kafka consumer.

You receive Kafka records by providing a #handler. As messages arrive the handler will be called with the records.

The #pause and #resume provides global control over reading the records from the consumer.

The #pause and #resume provides finer grained control over reading records for specific Topic/Partition, these are Kafka's specific operations.

Class Method Summary (collapse)

Instance Method Summary (collapse)

Class Method Details

+ (::VertxKafkaClient::KafkaConsumer) create(vertx, config) + (::VertxKafkaClient::KafkaConsumer) create(vertx, config, keyType, valueType)

Create a new KafkaConsumer instance

Overloads:

  • + (::VertxKafkaClient::KafkaConsumer) create(vertx, config)

    Parameters:

    • vertx (::Vertx::Vertx)
      Vert.x instance to use
    • config (Hash{String => String})
      Kafka consumer configuration
  • + (::VertxKafkaClient::KafkaConsumer) create(vertx, config, keyType, valueType)

    Parameters:

    • vertx (::Vertx::Vertx)
      Vert.x instance to use
    • config (Hash{String => String})
      Kafka consumer configuration
    • keyType (Nil)
      class type for the key deserialization
    • valueType (Nil)
      class type for the value deserialization

Returns:



255
256
257
258
259
260
261
262
263
264
265
266
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 255

def self.create(*args)
  if args[0].class.method_defined?(:j_del) && args[1].class == Hash && !block_given? && args[2] == nil && args[3] == nil
    return ::Vertx::Util::Utils.safe_create(Java::IoVertxKafkaClientConsumer::KafkaConsumer.java_method(:create, [Java::IoVertxCore::Vertx.java_class,Java::JavaUtil::Map.java_class]).call(args[0].j_del,Hash[args[1].map { |k,v| [k,v] }]),::VertxKafkaClient::KafkaConsumer, nil, nil)
  elsif args[0].class.method_defined?(:j_del) && args[1].class == Hash && args[2].class == Class && args[3].class == Class && !block_given?
    return ::Vertx::Util::Utils.safe_create(Java::IoVertxKafkaClientConsumer::KafkaConsumer.java_method(:create, [Java::IoVertxCore::Vertx.java_class,Java::JavaUtil::Map.java_class,Java::JavaLang::Class.java_class,Java::JavaLang::Class.java_class]).call(args[0].j_del,Hash[args[1].map { |k,v| [k,v] }],::Vertx::Util::Utils.j_class_of(args[2]),::Vertx::Util::Utils.j_class_of(args[3])),::VertxKafkaClient::KafkaConsumer, ::Vertx::Util::Utils.v_type_of(args[2]), ::Vertx::Util::Utils.v_type_of(args[3]))
  end
  if defined?(super)
    super
  else
    raise ArgumentError, "Invalid arguments when calling create(#{args[0]},#{args[1]},#{args[2]},#{args[3]})"
  end
end

Instance Method Details

- (self) assign(topicPartition, completionHandler) { ... } - (self) assign(topicPartitions, completionHandler) { ... }

Manually assign a list of partition to this consumer.

Due to internal buffering of messages, when reassigning the old set of partitions may remain in effect (as observed by the record handler)} until some time after the given completionHandler is called. In contrast, the once the given completionHandler is called the #batch_handler will only see messages consistent with the new set of partitions.

Overloads:

  • - (self) assign(topicPartition, completionHandler) { ... }

    Parameters:

    • topicPartition (Hash{String => Object})
      partition which want assigned

    Yields:

    • handler called on operation completed
  • - (self) assign(topicPartitions, completionHandler) { ... }

    Parameters:

    • topicPartitions (Set<Hash{String => Object}>)
      partitions which want assigned

    Yields:

    • handler called on operation completed

Returns:

  • (self)


751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 751

def assign(*args)
  if args[0].class == Hash && true
    if (block_given?)
      @j_del.java_method(:assign, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(args[0])),block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise)
      return self
    else
      promise = ::Vertx::Util::Utils.promise
      @j_del.java_method(:assign, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(args[0])),block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise)
      return ::Vertx::Util::Utils.safe_create(promise.future(),::Vertx::Future, nil)
    end
  elsif args[0].class == Set && true
    if (block_given?)
      @j_del.java_method(:assign, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(args[0].map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise)
      return self
    else
      promise = ::Vertx::Util::Utils.promise
      @j_del.java_method(:assign, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(args[0].map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise)
      return ::Vertx::Util::Utils.safe_create(promise.future(),::Vertx::Future, nil)
    end
  end
  if defined?(super)
    super
  else
    raise ArgumentError, "Invalid arguments when calling assign(#{args[0]})"
  end
end

- (self) assignment(handler) { ... }

Get the set of partitions currently assigned to this consumer.

Yields:

  • handler called on operation completed

Returns:

  • (self)


494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 494

def assignment
  if true
    if (block_given?)
      @j_del.java_method(:assignment, [Java::IoVertxCore::Handler.java_class]).call(block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ::Vertx::Util::Utils.to_set(ar.result).map! { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil } : nil) } : promise)
      return self
    else
      promise = ::Vertx::Util::Utils.promise
      @j_del.java_method(:assignment, [Java::IoVertxCore::Handler.java_class]).call(block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ::Vertx::Util::Utils.to_set(ar.result).map! { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil } : nil) } : promise)
      return ::Vertx::Util::Utils.safe_create(promise.future(),::Vertx::Future, nil)
    end
  end
  if defined?(super)
    super
  else
    raise ArgumentError, "Invalid arguments when calling assignment()"
  end
end

- (self) batchHandler(handler) { ... }

Set the handler to be used when batches of messages are fetched from the Kafka server. Batch handlers need to take care not to block the event loop when dealing with large batches. It is better to process records individually using the record handler.

Yields:

  • handler called when batches of messages are fetched

Returns:

  • (self)


213
214
215
216
217
218
219
220
221
222
223
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 213

def batch_handler
  if true
    @j_del.java_method(:batchHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.safe_create(event,::VertxKafkaClient::KafkaConsumerRecords, nil, nil)) unless !block_given? }))
    return self
  end
  if defined?(super)
    super
  else
    raise ArgumentError, "Invalid arguments when calling batch_handler()"
  end
end

- (void) beginningOffsets(topicPartition, handler) { ... }

This method returns an undefined value.

Get the first offset for the given partitions.

Parameters:

  • topicPartition (Hash{String => Object})
    the partition to get the earliest offset.

Yields:

  • handler called on operation completed. Returns the earliest available offset for the given partition


516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 516

def beginning_offsets(*args)
  if args[0].class == Hash && true
    if (block_given?)
      return @j_del.java_method(:beginningOffsets, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(args[0])),block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result : nil) } : promise)
    else
      promise = ::Vertx::Util::Utils.promise
      @j_del.java_method(:beginningOffsets, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(args[0])),block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result : nil) } : promise)
      return ::Vertx::Util::Utils.safe_create(promise.future(),::Vertx::Future, nil)
    end
  end
  if defined?(super)
    super
  else
    raise ArgumentError, "Invalid arguments when calling beginning_offsets(#{args[0]})"
  end
end

- (void) close(completionHandler) { ... }

This method returns an undefined value.

Close the consumer

Yields:

  • handler called on operation completed


328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 328

def close
  if true
    if (block_given?)
      return @j_del.java_method(:close, [Java::IoVertxCore::Handler.java_class]).call(block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise)
    else
      promise = ::Vertx::Util::Utils.promise
      @j_del.java_method(:close, [Java::IoVertxCore::Handler.java_class]).call(block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise)
      return ::Vertx::Util::Utils.safe_create(promise.future(),::Vertx::Future, nil)
    end
  end
  if defined?(super)
    super
  else
    raise ArgumentError, "Invalid arguments when calling close()"
  end
end

- (void) commit(completionHandler) { ... }

This method returns an undefined value.

Commit current offsets for all the subscribed list of topics and partition.

Yields:

  • handler called on operation completed


73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 73

def commit
  if true
    if (block_given?)
      return @j_del.java_method(:commit, [Java::IoVertxCore::Handler.java_class]).call(block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise)
    else
      promise = ::Vertx::Util::Utils.promise
      @j_del.java_method(:commit, [Java::IoVertxCore::Handler.java_class]).call(block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise)
      return ::Vertx::Util::Utils.safe_create(promise.future(),::Vertx::Future, nil)
    end
  end
  if defined?(super)
    super
  else
    raise ArgumentError, "Invalid arguments when calling commit()"
  end
end

- (void) committed(topicPartition, handler) { ... }

This method returns an undefined value.

Get the last committed offset for the given partition (whether the commit happened by this process or another).

Parameters:

  • topicPartition (Hash{String => Object})
    topic partition for getting last committed offset

Yields:

  • handler called on operation completed


432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 432

def committed(*args)
  if args[0].class == Hash && true
    if (block_given?)
      return @j_del.java_method(:committed, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(args[0])),block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result != nil ? JSON.parse(ar.result.toJson.encode) : nil : nil) } : promise)
    else
      promise = ::Vertx::Util::Utils.promise
      @j_del.java_method(:committed, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(args[0])),block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result != nil ? JSON.parse(ar.result.toJson.encode) : nil : nil) } : promise)
      return ::Vertx::Util::Utils.safe_create(promise.future(),::Vertx::Future,::Vertx::Util::data_object_type(Java::IoVertxKafkaClientConsumer::OffsetAndMetadata))
    end
  end
  if defined?(super)
    super
  else
    raise ArgumentError, "Invalid arguments when calling committed(#{args[0]})"
  end
end

- (self) endHandler(endHandler) { ... }

Yields:

Returns:

  • (self)


611
612
613
614
615
616
617
618
619
620
621
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 611

def end_handler
  if true
    @j_del.java_method(:endHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(nil) unless !block_given? }))
    return self
  end
  if defined?(super)
    super
  else
    raise ArgumentError, "Invalid arguments when calling end_handler()"
  end
end

- (void) endOffsets(topicPartition, handler) { ... }

This method returns an undefined value.

Get the last offset for the given partition. The last offset of a partition is the offset of the upcoming message, i.e. the offset of the last available message + 1.

Parameters:

  • topicPartition (Hash{String => Object})
    the partition to get the end offset.

Yields:

  • handler called on operation completed. The end offset for the given partition.


411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 411

def end_offsets(*args)
  if args[0].class == Hash && true
    if (block_given?)
      return @j_del.java_method(:endOffsets, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(args[0])),block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result : nil) } : promise)
    else
      promise = ::Vertx::Util::Utils.promise
      @j_del.java_method(:endOffsets, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(args[0])),block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result : nil) } : promise)
      return ::Vertx::Util::Utils.safe_create(promise.future(),::Vertx::Future, nil)
    end
  end
  if defined?(super)
    super
  else
    raise ArgumentError, "Invalid arguments when calling end_offsets(#{args[0]})"
  end
end

- (self) exceptionHandler(handler) { ... }

Yields:

Returns:

  • (self)


724
725
726
727
728
729
730
731
732
733
734
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 724

def exception_handler
  if true
    @j_del.java_method(:exceptionHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.from_throwable(event)) unless !block_given? }))
    return self
  end
  if defined?(super)
    super
  else
    raise ArgumentError, "Invalid arguments when calling exception_handler()"
  end
end

- (self) fetch(amount)

Parameters:

  • amount (Fixnum)

Returns:

  • (self)


647
648
649
650
651
652
653
654
655
656
657
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 647

def fetch(*args)
  if args[0].class == Fixnum && !block_given?
    @j_del.java_method(:fetch, [Java::long.java_class]).call(args[0])
    return self
  end
  if defined?(super)
    super
  else
    raise ArgumentError, "Invalid arguments when calling fetch(#{args[0]})"
  end
end

- (self) handler(handler) { ... }

Yields:

Returns:

  • (self)


38
39
40
41
42
43
44
45
46
47
48
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 38

def handler
  if true
    @j_del.java_method(:handler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.safe_create(event,::VertxKafkaClient::KafkaConsumerRecord, nil, nil)) unless !block_given? }))
    return self
  end
  if defined?(super)
    super
  else
    raise ArgumentError, "Invalid arguments when calling handler()"
  end
end

- (void) offsetsForTimes(topicPartition, timestamp, handler) { ... }

This method returns an undefined value.

Look up the offset for the given partition by timestamp. Note: the result might be null in case for the given timestamp no offset can be found -- e.g., when the timestamp refers to the future

Parameters:

  • topicPartition (Hash{String => Object})
    TopicPartition to query.
  • timestamp (Fixnum)
    Timestamp to be used in the query.

Yields:

  • handler called on operation completed


351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 351

def offsets_for_times(*args)
  if args[0].class == Hash && args[1].class == Fixnum && true
    if (block_given?)
      return @j_del.java_method(:offsetsForTimes, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::JavaLang::Long.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(args[0])),args[1],block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result != nil ? JSON.parse(ar.result.toJson.encode) : nil : nil) } : promise)
    else
      promise = ::Vertx::Util::Utils.promise
      @j_del.java_method(:offsetsForTimes, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::JavaLang::Long.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(args[0])),args[1],block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result != nil ? JSON.parse(ar.result.toJson.encode) : nil : nil) } : promise)
      return ::Vertx::Util::Utils.safe_create(promise.future(),::Vertx::Future,::Vertx::Util::data_object_type(Java::IoVertxKafkaClientConsumer::OffsetAndTimestamp))
    end
  end
  if defined?(super)
    super
  else
    raise ArgumentError, "Invalid arguments when calling offsets_for_times(#{args[0]},#{args[1]})"
  end
end

- (self) partitionsAssignedHandler(handler) { ... }

Set the handler called when topic partitions are assigned to the consumer

Yields:

  • handler called on assigned topic partitions

Returns:

  • (self)


597
598
599
600
601
602
603
604
605
606
607
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 597

def partitions_assigned_handler
  if true
    @j_del.java_method(:partitionsAssignedHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.to_set(event).map! { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil }) unless !block_given? }))
    return self
  end
  if defined?(super)
    super
  else
    raise ArgumentError, "Invalid arguments when calling partitions_assigned_handler()"
  end
end

- (self) partitionsFor(topic, handler) { ... }

Get metadata about the partitions for a given topic.

Parameters:

  • topic (String)
    topic partition for which getting partitions info

Yields:

  • handler called on operation completed

Returns:

  • (self)


627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 627

def partitions_for(*args)
  if args[0].class == String && true
    if (block_given?)
      @j_del.java_method(:partitionsFor, [Java::java.lang.String.java_class,Java::IoVertxCore::Handler.java_class]).call(args[0],block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result.to_a.map { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil } : nil) } : promise)
      return self
    else
      promise = ::Vertx::Util::Utils.promise
      @j_del.java_method(:partitionsFor, [Java::java.lang.String.java_class,Java::IoVertxCore::Handler.java_class]).call(args[0],block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result.to_a.map { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil } : nil) } : promise)
      return ::Vertx::Util::Utils.safe_create(promise.future(),::Vertx::Future, nil)
    end
  end
  if defined?(super)
    super
  else
    raise ArgumentError, "Invalid arguments when calling partitions_for(#{args[0]})"
  end
end

- (self) partitionsRevokedHandler(handler) { ... }

Set the handler called when topic partitions are revoked to the consumer

Yields:

  • handler called on revoked topic partitions

Returns:

  • (self)


536
537
538
539
540
541
542
543
544
545
546
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 536

def partitions_revoked_handler
  if true
    @j_del.java_method(:partitionsRevokedHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.to_set(event).map! { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil }) unless !block_given? }))
    return self
  end
  if defined?(super)
    super
  else
    raise ArgumentError, "Invalid arguments when calling partitions_revoked_handler()"
  end
end

- (self) pause - (self) pause(topicPartition, completionHandler) { ... } - (self) pause(topicPartitions, completionHandler) { ... }

Suspend fetching from the requested partitions.

Due to internal buffering of messages, the will continue to observe messages from the given topicPartitions until some time after the given completionHandler is called. In contrast, the once the given completionHandler is called the #batch_handler will not see messages from the given topicPartitions.

Overloads:

  • - (self) pause(topicPartition, completionHandler) { ... }

    Parameters:

    • topicPartition (Hash{String => Object})
      topic partition from which suspend fetching

    Yields:

    • handler called on operation completed
  • - (self) pause(topicPartitions, completionHandler) { ... }

    Parameters:

    • topicPartitions (Set<Hash{String => Object}>)
      topic partition from which suspend fetching

    Yields:

    • handler called on operation completed

Returns:

  • (self)


564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 564

def pause(*args)
  if !block_given? && args[0] == nil
    @j_del.java_method(:pause, []).call()
    return self
  elsif args[0].class == Hash && true
    if (block_given?)
      @j_del.java_method(:pause, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(args[0])),block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise)
      return self
    else
      promise = ::Vertx::Util::Utils.promise
      @j_del.java_method(:pause, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(args[0])),block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise)
      return ::Vertx::Util::Utils.safe_create(promise.future(),::Vertx::Future, nil)
    end
  elsif args[0].class == Set && true
    if (block_given?)
      @j_del.java_method(:pause, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(args[0].map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise)
      return self
    else
      promise = ::Vertx::Util::Utils.promise
      @j_del.java_method(:pause, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(args[0].map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise)
      return ::Vertx::Util::Utils.safe_create(promise.future(),::Vertx::Future, nil)
    end
  end
  if defined?(super)
    super
  else
    raise ArgumentError, "Invalid arguments when calling pause(#{args[0]})"
  end
end

- (void) paused(handler) { ... }

This method returns an undefined value.

Get the set of partitions that were previously paused by a call to pause(Set).

Yields:

  • handler called on operation completed


53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 53

def paused
  if true
    if (block_given?)
      return @j_del.java_method(:paused, [Java::IoVertxCore::Handler.java_class]).call(block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ::Vertx::Util::Utils.to_set(ar.result).map! { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil } : nil) } : promise)
    else
      promise = ::Vertx::Util::Utils.promise
      @j_del.java_method(:paused, [Java::IoVertxCore::Handler.java_class]).call(block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ::Vertx::Util::Utils.to_set(ar.result).map! { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil } : nil) } : promise)
      return ::Vertx::Util::Utils.safe_create(promise.future(),::Vertx::Future, nil)
    end
  end
  if defined?(super)
    super
  else
    raise ArgumentError, "Invalid arguments when calling paused()"
  end
end

- (::Vertx::Pipe) pipe

Pause this stream and return a to transfer the elements of this stream to a destination .

The stream will be resumed when the pipe will be wired to a WriteStream.

Returns:



272
273
274
275
276
277
278
279
280
281
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 272

def pipe
  if !block_given?
    return ::Vertx::Util::Utils.safe_create(@j_del.java_method(:pipe, []).call(),::Vertx::Pipe,::VertxKafkaClient::KafkaConsumerRecord.j_api_type)
  end
  if defined?(super)
    super
  else
    raise ArgumentError, "Invalid arguments when calling pipe()"
  end
end

- (void) pipeTo(dst, handler) { ... }

This method returns an undefined value.

Pipe this ReadStream to the WriteStream.

Elements emitted by this stream will be written to the write stream until this stream ends or fails.

Once this stream has ended or failed, the write stream will be ended and the handler will be called with the result.

Parameters:

Yields:



190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 190

def pipe_to(*args)
  if args[0].class.method_defined?(:j_del) && true
    if (block_given?)
      return @j_del.java_method(:pipeTo, [Java::IoVertxCoreStreams::WriteStream.java_class,Java::IoVertxCore::Handler.java_class]).call(args[0].j_del,block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise)
    else
      promise = ::Vertx::Util::Utils.promise
      @j_del.java_method(:pipeTo, [Java::IoVertxCoreStreams::WriteStream.java_class,Java::IoVertxCore::Handler.java_class]).call(args[0].j_del,block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise)
      return ::Vertx::Util::Utils.safe_create(promise.future(),::Vertx::Future, nil)
    end
  end
  if defined?(super)
    super
  else
    raise ArgumentError, "Invalid arguments when calling pipe_to(#{args[0]})"
  end
end

- (void) poll(timeout, handler) { ... }

This method returns an undefined value.

Executes a poll for getting messages from Kafka

Parameters:

  • timeout (Fixnum)
    The time, in milliseconds, spent waiting in poll if data is not available in the buffer. If 0, returns immediately with any records that are available currently in the native Kafka consumer's buffer, else returns empty. Must not be negative.

Yields:

  • handler called after the poll with batch of records (can be empty).


112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 112

def poll(*args)
  if args[0].class == Fixnum && true
    if (block_given?)
      return @j_del.java_method(:poll, [Java::long.java_class,Java::IoVertxCore::Handler.java_class]).call(args[0],block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ::Vertx::Util::Utils.safe_create(ar.result,::VertxKafkaClient::KafkaConsumerRecords, nil, nil) : nil) } : promise)
    else
      promise = ::Vertx::Util::Utils.promise
      @j_del.java_method(:poll, [Java::long.java_class,Java::IoVertxCore::Handler.java_class]).call(args[0],block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ::Vertx::Util::Utils.safe_create(ar.result,::VertxKafkaClient::KafkaConsumerRecords, nil, nil) : nil) } : promise)
      return ::Vertx::Util::Utils.safe_create(promise.future(),::Vertx::Future,::VertxKafkaClient::KafkaConsumerRecords.j_api_type)
    end
  end
  if defined?(super)
    super
  else
    raise ArgumentError, "Invalid arguments when calling poll(#{args[0]})"
  end
end

- (self) pollTimeout(timeout)

Sets the poll timeout (in ms) for the underlying native Kafka Consumer. Defaults to 1000. Setting timeout to a lower value results in a more 'responsive' client, because it will block for a shorter period if no data is available in the assigned partition and therefore allows subsequent actions to be executed with a shorter delay. At the same time, the client will poll more frequently and thus will potentially create a higher load on the Kafka Broker.

Parameters:

  • timeout (Fixnum)
    The time, in milliseconds, spent waiting in poll if data is not available in the buffer. If 0, returns immediately with any records that are available currently in the native Kafka consumer's buffer, else returns empty. Must not be negative.

Returns:

  • (self)


96
97
98
99
100
101
102
103
104
105
106
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 96

def poll_timeout(*args)
  if args[0].class == Fixnum && !block_given?
    @j_del.java_method(:pollTimeout, [Java::long.java_class]).call(args[0])
    return self
  end
  if defined?(super)
    super
  else
    raise ArgumentError, "Invalid arguments when calling poll_timeout(#{args[0]})"
  end
end

- (void) position(partition, handler) { ... }

This method returns an undefined value.

Get the offset of the next record that will be fetched (if a record with that offset exists).

Parameters:

  • partition (Hash{String => Object})
    The partition to get the position for

Yields:

  • handler called on operation completed


663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 663

def position(*args)
  if args[0].class == Hash && true
    if (block_given?)
      return @j_del.java_method(:position, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(args[0])),block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result : nil) } : promise)
    else
      promise = ::Vertx::Util::Utils.promise
      @j_del.java_method(:position, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(args[0])),block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result : nil) } : promise)
      return ::Vertx::Util::Utils.safe_create(promise.future(),::Vertx::Future, nil)
    end
  end
  if defined?(super)
    super
  else
    raise ArgumentError, "Invalid arguments when calling position(#{args[0]})"
  end
end

- (self) resume - (self) resume(topicPartition, completionHandler) { ... } - (self) resume(topicPartitions, completionHandler) { ... }

Resume specified partitions which have been paused with pause.

Overloads:

  • - (self) resume(topicPartition, completionHandler) { ... }

    Parameters:

    • topicPartition (Hash{String => Object})
      topic partition from which resume fetching

    Yields:

    • handler called on operation completed
  • - (self) resume(topicPartitions, completionHandler) { ... }

    Parameters:

    • topicPartitions (Set<Hash{String => Object}>)
      topic partition from which resume fetching

    Yields:

    • handler called on operation completed

Returns:

  • (self)


376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 376

def resume(*args)
  if !block_given? && args[0] == nil
    @j_del.java_method(:resume, []).call()
    return self
  elsif args[0].class == Hash && true
    if (block_given?)
      @j_del.java_method(:resume, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(args[0])),block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise)
      return self
    else
      promise = ::Vertx::Util::Utils.promise
      @j_del.java_method(:resume, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(args[0])),block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise)
      return ::Vertx::Util::Utils.safe_create(promise.future(),::Vertx::Future, nil)
    end
  elsif args[0].class == Set && true
    if (block_given?)
      @j_del.java_method(:resume, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(args[0].map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise)
      return self
    else
      promise = ::Vertx::Util::Utils.promise
      @j_del.java_method(:resume, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(args[0].map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise)
      return ::Vertx::Util::Utils.safe_create(promise.future(),::Vertx::Future, nil)
    end
  end
  if defined?(super)
    super
  else
    raise ArgumentError, "Invalid arguments when calling resume(#{args[0]})"
  end
end

- (self) seek(topicPartition, offset, completionHandler) { ... }

Overrides the fetch offsets that the consumer will use on the next poll.

Due to internal buffering of messages, the will continue to observe messages fetched with respect to the old offset until some time after the given completionHandler is called. In contrast, the once the given completionHandler is called the #batch_handler will only see messages consistent with the new offset.

Parameters:

  • topicPartition (Hash{String => Object})
    topic partition for which seek
  • offset (Fixnum)
    offset to seek inside the topic partition

Yields:

  • handler called on operation completed

Returns:

  • (self)


163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 163

def seek(*args)
  if args[0].class == Hash && args[1].class == Fixnum && true
    if (block_given?)
      @j_del.java_method(:seek, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::long.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(args[0])),args[1],block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise)
      return self
    else
      promise = ::Vertx::Util::Utils.promise
      @j_del.java_method(:seek, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::long.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(args[0])),args[1],block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise)
      return ::Vertx::Util::Utils.safe_create(promise.future(),::Vertx::Future, nil)
    end
  end
  if defined?(super)
    super
  else
    raise ArgumentError, "Invalid arguments when calling seek(#{args[0]},#{args[1]})"
  end
end

- (self) seekToBeginning(topicPartition, completionHandler) { ... } - (self) seekToBeginning(topicPartitions, completionHandler) { ... }

Seek to the first offset for each of the given partitions.

Due to internal buffering of messages, the will continue to observe messages fetched with respect to the old offset until some time after the given completionHandler is called. In contrast, the once the given completionHandler is called the #batch_handler will only see messages consistent with the new offset.

Overloads:

  • - (self) seekToBeginning(topicPartition, completionHandler) { ... }

    Parameters:

    • topicPartition (Hash{String => Object})
      topic partition for which seek

    Yields:

    • handler called on operation completed
  • - (self) seekToBeginning(topicPartitions, completionHandler) { ... }

    Parameters:

    • topicPartitions (Set<Hash{String => Object}>)
      topic partition for which seek

    Yields:

    • handler called on operation completed

Returns:

  • (self)


695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 695

def seek_to_beginning(*args)
  if args[0].class == Hash && true
    if (block_given?)
      @j_del.java_method(:seekToBeginning, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(args[0])),block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise)
      return self
    else
      promise = ::Vertx::Util::Utils.promise
      @j_del.java_method(:seekToBeginning, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(args[0])),block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise)
      return ::Vertx::Util::Utils.safe_create(promise.future(),::Vertx::Future, nil)
    end
  elsif args[0].class == Set && true
    if (block_given?)
      @j_del.java_method(:seekToBeginning, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(args[0].map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise)
      return self
    else
      promise = ::Vertx::Util::Utils.promise
      @j_del.java_method(:seekToBeginning, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(args[0].map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise)
      return ::Vertx::Util::Utils.safe_create(promise.future(),::Vertx::Future, nil)
    end
  end
  if defined?(super)
    super
  else
    raise ArgumentError, "Invalid arguments when calling seek_to_beginning(#{args[0]})"
  end
end

- (self) seekToEnd(topicPartition, completionHandler) { ... } - (self) seekToEnd(topicPartitions, completionHandler) { ... }

Seek to the last offset for each of the given partitions.

Due to internal buffering of messages, the will continue to observe messages fetched with respect to the old offset until some time after the given completionHandler is called. In contrast, the once the given completionHandler is called the #batch_handler will only see messages consistent with the new offset.

Overloads:

  • - (self) seekToEnd(topicPartition, completionHandler) { ... }

    Parameters:

    • topicPartition (Hash{String => Object})
      topic partition for which seek

    Yields:

    • handler called on operation completed
  • - (self) seekToEnd(topicPartitions, completionHandler) { ... }

    Parameters:

    • topicPartitions (Set<Hash{String => Object}>)
      topic partition for which seek

    Yields:

    • handler called on operation completed

Returns:

  • (self)


298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 298

def seek_to_end(*args)
  if args[0].class == Hash && true
    if (block_given?)
      @j_del.java_method(:seekToEnd, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(args[0])),block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise)
      return self
    else
      promise = ::Vertx::Util::Utils.promise
      @j_del.java_method(:seekToEnd, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(args[0])),block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise)
      return ::Vertx::Util::Utils.safe_create(promise.future(),::Vertx::Future, nil)
    end
  elsif args[0].class == Set && true
    if (block_given?)
      @j_del.java_method(:seekToEnd, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(args[0].map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise)
      return self
    else
      promise = ::Vertx::Util::Utils.promise
      @j_del.java_method(:seekToEnd, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(args[0].map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise)
      return ::Vertx::Util::Utils.safe_create(promise.future(),::Vertx::Future, nil)
    end
  end
  if defined?(super)
    super
  else
    raise ArgumentError, "Invalid arguments when calling seek_to_end(#{args[0]})"
  end
end

- (self) subscribe(topic, completionHandler) { ... } - (self) subscribe(topics, completionHandler) { ... }

Subscribe to the given list of topics to get dynamically assigned partitions.

Due to internal buffering of messages, when changing the subscribed topics the old set of topics may remain in effect (as observed by the record handler}) until some time after the given completionHandler is called. In contrast, the once the given completionHandler is called the #batch_handler will only see messages consistent with the new set of topics.

Overloads:

  • - (self) subscribe(topic, completionHandler) { ... }

    Parameters:

    • topic (String)
      topic to subscribe to

    Yields:

    • handler called on operation completed
  • - (self) subscribe(topics, completionHandler) { ... }

    Parameters:

    • topics (Set<String>)
      topics to subscribe to

    Yields:

    • handler called on operation completed

Returns:

  • (self)


464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 464

def subscribe(*args)
  if args[0].class == String && true
    if (block_given?)
      @j_del.java_method(:subscribe, [Java::java.lang.String.java_class,Java::IoVertxCore::Handler.java_class]).call(args[0],block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise)
      return self
    else
      promise = ::Vertx::Util::Utils.promise
      @j_del.java_method(:subscribe, [Java::java.lang.String.java_class,Java::IoVertxCore::Handler.java_class]).call(args[0],block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise)
      return ::Vertx::Util::Utils.safe_create(promise.future(),::Vertx::Future, nil)
    end
  elsif args[0].class == Set && true
    if (block_given?)
      @j_del.java_method(:subscribe, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(args[0].map { |element| element }),block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise)
      return self
    else
      promise = ::Vertx::Util::Utils.promise
      @j_del.java_method(:subscribe, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(args[0].map { |element| element }),block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise)
      return ::Vertx::Util::Utils.safe_create(promise.future(),::Vertx::Future, nil)
    end
  end
  if defined?(super)
    super
  else
    raise ArgumentError, "Invalid arguments when calling subscribe(#{args[0]})"
  end
end

- (self) subscription(handler) { ... }

Get the current subscription.

Yields:

  • handler called on operation completed

Returns:

  • (self)


132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 132

def subscription
  if true
    if (block_given?)
      @j_del.java_method(:subscription, [Java::IoVertxCore::Handler.java_class]).call(block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ::Vertx::Util::Utils.to_set(ar.result).map! { |elt| elt } : nil) } : promise)
      return self
    else
      promise = ::Vertx::Util::Utils.promise
      @j_del.java_method(:subscription, [Java::IoVertxCore::Handler.java_class]).call(block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ::Vertx::Util::Utils.to_set(ar.result).map! { |elt| elt } : nil) } : promise)
      return ::Vertx::Util::Utils.safe_create(promise.future(),::Vertx::Future, nil)
    end
  end
  if defined?(super)
    super
  else
    raise ArgumentError, "Invalid arguments when calling subscription()"
  end
end

- (self) unsubscribe(completionHandler) { ... }

Unsubscribe from topics currently subscribed with subscribe.

Yields:

  • handler called on operation completed

Returns:

  • (self)


228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 228

def unsubscribe
  if true
    if (block_given?)
      @j_del.java_method(:unsubscribe, [Java::IoVertxCore::Handler.java_class]).call(block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise)
      return self
    else
      promise = ::Vertx::Util::Utils.promise
      @j_del.java_method(:unsubscribe, [Java::IoVertxCore::Handler.java_class]).call(block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise)
      return ::Vertx::Util::Utils.safe_create(promise.future(),::Vertx::Future, nil)
    end
  end
  if defined?(super)
    super
  else
    raise ArgumentError, "Invalid arguments when calling unsubscribe()"
  end
end