Class: VertxKafkaClient::KafkaConsumer

Inherits:
Object
  • Object
show all
Includes:
Vertx::ReadStream
Defined in:
/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb

Overview

Vert.x Kafka consumer.

You receive Kafka records by providing a #handler. As messages arrive the handler will be called with the records.

The #pause and #resume provides global control over reading the records from the consumer.

The #pause and #resume provides finer grained control over reading records for specific Topic/Partition, these are Kafka's specific operations.

Class Method Summary (collapse)

Instance Method Summary (collapse)

Class Method Details

+ (::VertxKafkaClient::KafkaConsumer) create(vertx = nil, config = nil, keyType = nil, valueType = nil)

Create a new KafkaConsumer instance

Parameters:

  • vertx (::Vertx::Vertx) (defaults to: nil)
    Vert.x instance to use
  • config (Hash{String => String}) (defaults to: nil)
    Kafka consumer configuration
  • keyType (Nil) (defaults to: nil)
    class type for the key deserialization
  • valueType (Nil) (defaults to: nil)
    class type for the value deserialization

Returns:

Raises:

  • (ArgumentError)


66
67
68
69
70
71
72
73
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 66

def self.create(vertx=nil,config=nil,keyType=nil,valueType=nil)
  if vertx.class.method_defined?(:j_del) && config.class == Hash && !block_given? && keyType == nil && valueType == nil
    return ::Vertx::Util::Utils.safe_create(Java::IoVertxKafkaClientConsumer::KafkaConsumer.java_method(:create, [Java::IoVertxCore::Vertx.java_class,Java::JavaUtil::Map.java_class]).call(vertx.j_del,Hash[config.map { |k,v| [k,v] }]),::VertxKafkaClient::KafkaConsumer, nil, nil)
  elsif vertx.class.method_defined?(:j_del) && config.class == Hash && keyType.class == Class && valueType.class == Class && !block_given?
    return ::Vertx::Util::Utils.safe_create(Java::IoVertxKafkaClientConsumer::KafkaConsumer.java_method(:create, [Java::IoVertxCore::Vertx.java_class,Java::JavaUtil::Map.java_class,Java::JavaLang::Class.java_class,Java::JavaLang::Class.java_class]).call(vertx.j_del,Hash[config.map { |k,v| [k,v] }],::Vertx::Util::Utils.j_class_of(keyType),::Vertx::Util::Utils.j_class_of(valueType)),::VertxKafkaClient::KafkaConsumer, ::Vertx::Util::Utils.v_type_of(keyType), ::Vertx::Util::Utils.v_type_of(valueType))
  end
  raise ArgumentError, "Invalid arguments when calling create(#{vertx},#{config},#{keyType},#{valueType})"
end

Instance Method Details

- (self) assign(topicPartition) - (self) assign(topicPartitions) - (self) assign(topicPartition, completionHandler) { ... } - (self) assign(topicPartitions, completionHandler) { ... }

Manually assign a list of partition to this consumer.

Due to internal buffering of messages, when reassigning the old set of partitions may remain in effect (as observed by the record handler)} until some time after the given completionHandler is called. In contrast, the once the given completionHandler is called the #batch_handler will only see messages consistent with the new set of partitions.

Overloads:

  • - (self) assign(topicPartition)

    Parameters:

    • topicPartition (Hash)
      partition which want assigned
  • - (self) assign(topicPartitions)

    Parameters:

    • topicPartitions (Set<Hash>)
      partitions which want assigned
  • - (self) assign(topicPartition, completionHandler) { ... }

    Parameters:

    • topicPartition (Hash)
      partition which want assigned

    Yields:

    • handler called on operation completed
  • - (self) assign(topicPartitions, completionHandler) { ... }

    Parameters:

    • topicPartitions (Set<Hash>)
      partitions which want assigned

    Yields:

    • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 251

def assign(param_1=nil)
  if param_1.class == Hash && !block_given?
    @j_del.java_method(:assign, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)))
    return self
  elsif param_1.class == Set && !block_given?
    @j_del.java_method(:assign, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }))
    return self
  elsif param_1.class == Hash && block_given?
    @j_del.java_method(:assign, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  elsif param_1.class == Set && block_given?
    @j_del.java_method(:assign, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling assign(#{param_1})"
end

- (self) assignment { ... }

Get the set of partitions currently assigned to this consumer.

Yields:

  • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


270
271
272
273
274
275
276
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 270

def assignment
  if block_given?
    @j_del.java_method(:assignment, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ::Vertx::Util::Utils.to_set(ar.result).map! { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil } : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling assignment()"
end

- (self) batch_handler { ... }

Set the handler to be used when batches of messages are fetched from the Kafka server. Batch handlers need to take care not to block the event loop when dealing with large batches. It is better to process records individually using the record handler.

Yields:

  • handler called when batches of messages are fetched

Returns:

  • (self)

Raises:

  • (ArgumentError)


462
463
464
465
466
467
468
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 462

def batch_handler
  if block_given?
    @j_del.java_method(:batchHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.safe_create(event,::VertxKafkaClient::KafkaConsumerRecords, nil, nil)) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling batch_handler()"
end

- (void) beginning_offsets(topicPartition = nil) { ... }

This method returns an undefined value.

Get the first offset for the given partitions.

Parameters:

  • topicPartition (Hash) (defaults to: nil)
    the partition to get the earliest offset.

Yields:

  • handler called on operation completed. Returns the earliest available offset for the given partition

Raises:

  • (ArgumentError)


506
507
508
509
510
511
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 506

def beginning_offsets(topicPartition=nil)
  if topicPartition.class == Hash && block_given?
    return @j_del.java_method(:beginningOffsets, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result : nil) }))
  end
  raise ArgumentError, "Invalid arguments when calling beginning_offsets(#{topicPartition})"
end

- (void) close { ... }

This method returns an undefined value.

Close the consumer

Yields:

  • handler called on operation completed

Raises:

  • (ArgumentError)


472
473
474
475
476
477
478
479
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 472

def close
  if !block_given?
    return @j_del.java_method(:close, []).call()
  elsif block_given?
    return @j_del.java_method(:close, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
  end
  raise ArgumentError, "Invalid arguments when calling close()"
end

- (void) commit { ... }

This method returns an undefined value.

Commit current offsets for all the subscribed list of topics and partition.

Yields:

  • handler called on operation completed

Raises:

  • (ArgumentError)


427
428
429
430
431
432
433
434
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 427

def commit
  if !block_given?
    return @j_del.java_method(:commit, []).call()
  elsif block_given?
    return @j_del.java_method(:commit, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
  end
  raise ArgumentError, "Invalid arguments when calling commit()"
end

- (void) committed(topicPartition = nil) { ... }

This method returns an undefined value.

Get the last committed offset for the given partition (whether the commit happened by this process or another).

Parameters:

  • topicPartition (Hash) (defaults to: nil)
    topic partition for getting last committed offset

Yields:

  • handler called on operation completed

Raises:

  • (ArgumentError)


439
440
441
442
443
444
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 439

def committed(topicPartition=nil)
  if topicPartition.class == Hash && block_given?
    return @j_del.java_method(:committed, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result != nil ? JSON.parse(ar.result.toJson.encode) : nil : nil) }))
  end
  raise ArgumentError, "Invalid arguments when calling committed(#{topicPartition})"
end

- (Fixnum) demand

Returns the current demand.
    If the stream is in flowing mode will return MAX_VALUE.
  • If the stream is in fetch mode, will return the current number of elements still to be delivered or 0 if paused.

Returns:

  • (Fixnum)
    current demand

Raises:

  • (ArgumentError)


189
190
191
192
193
194
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 189

def demand
  if !block_given?
    return @j_del.java_method(:demand, []).call()
  end
  raise ArgumentError, "Invalid arguments when calling demand()"
end

- (self) end_handler { ... }

Yields:

Returns:

  • (self)

Raises:

  • (ArgumentError)


166
167
168
169
170
171
172
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 166

def end_handler
  if block_given?
    @j_del.java_method(:endHandler, [Java::IoVertxCore::Handler.java_class]).call(Proc.new { yield })
    return self
  end
  raise ArgumentError, "Invalid arguments when calling end_handler()"
end

- (void) end_offsets(topicPartition = nil) { ... }

This method returns an undefined value.

Get the last offset for the given partition. The last offset of a partition is the offset of the upcoming message, i.e. the offset of the last available message + 1.

Parameters:

  • topicPartition (Hash) (defaults to: nil)
    the partition to get the end offset.

Yields:

  • handler called on operation completed. The end offset for the given partition.

Raises:

  • (ArgumentError)


517
518
519
520
521
522
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 517

def end_offsets(topicPartition=nil)
  if topicPartition.class == Hash && block_given?
    return @j_del.java_method(:endOffsets, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result : nil) }))
  end
  raise ArgumentError, "Invalid arguments when calling end_offsets(#{topicPartition})"
end

- (self) exception_handler { ... }

Yields:

Returns:

  • (self)

Raises:

  • (ArgumentError)


76
77
78
79
80
81
82
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 76

def exception_handler
  if block_given?
    @j_del.java_method(:exceptionHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.from_throwable(event)) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling exception_handler()"
end

- (self) fetch(amount = nil)

Parameters:

  • amount (Fixnum) (defaults to: nil)

Returns:

  • (self)

Raises:

  • (ArgumentError)


175
176
177
178
179
180
181
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 175

def fetch(amount=nil)
  if amount.class == Fixnum && !block_given?
    @j_del.java_method(:fetch, [Java::long.java_class]).call(amount)
    return self
  end
  raise ArgumentError, "Invalid arguments when calling fetch(#{amount})"
end

- (self) handler { ... }

Yields:

Returns:

  • (self)

Raises:

  • (ArgumentError)


85
86
87
88
89
90
91
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 85

def handler
  if block_given?
    @j_del.java_method(:handler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.safe_create(event,::VertxKafkaClient::KafkaConsumerRecord, nil, nil)) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling handler()"
end

- (void) offsets_for_times(topicPartition = nil, timestamp = nil) { ... }

This method returns an undefined value.

Look up the offset for the given partition by timestamp. Note: the result might be null in case for the given timestamp no offset can be found -- e.g., when the timestamp refers to the future

Parameters:

  • topicPartition (Hash) (defaults to: nil)
    TopicPartition to query.
  • timestamp (Fixnum) (defaults to: nil)
    Timestamp to be used in the query.

Yields:

  • handler called on operation completed

Raises:

  • (ArgumentError)


496
497
498
499
500
501
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 496

def offsets_for_times(topicPartition=nil,timestamp=nil)
  if topicPartition.class == Hash && timestamp.class == Fixnum && block_given?
    return @j_del.java_method(:offsetsForTimes, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::JavaLang::Long.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),timestamp,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result != nil ? JSON.parse(ar.result.toJson.encode) : nil : nil) }))
  end
  raise ArgumentError, "Invalid arguments when calling offsets_for_times(#{topicPartition},#{timestamp})"
end

- (self) partitions_assigned_handler { ... }

Set the handler called when topic partitions are assigned to the consumer

Yields:

  • handler called on assigned topic partitions

Returns:

  • (self)

Raises:

  • (ArgumentError)


322
323
324
325
326
327
328
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 322

def partitions_assigned_handler
  if block_given?
    @j_del.java_method(:partitionsAssignedHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.to_set(event).map! { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil }) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling partitions_assigned_handler()"
end

- (self) partitions_for(topic = nil) { ... }

Get metadata about the partitions for a given topic.

Parameters:

  • topic (String) (defaults to: nil)
    topic partition for which getting partitions info

Yields:

  • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


449
450
451
452
453
454
455
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 449

def partitions_for(topic=nil)
  if topic.class == String && block_given?
    @j_del.java_method(:partitionsFor, [Java::java.lang.String.java_class,Java::IoVertxCore::Handler.java_class]).call(topic,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result.to_a.map { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil } : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling partitions_for(#{topic})"
end

- (self) partitions_revoked_handler { ... }

Set the handler called when topic partitions are revoked to the consumer

Yields:

  • handler called on revoked topic partitions

Returns:

  • (self)

Raises:

  • (ArgumentError)


312
313
314
315
316
317
318
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 312

def partitions_revoked_handler
  if block_given?
    @j_del.java_method(:partitionsRevokedHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.to_set(event).map! { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil }) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling partitions_revoked_handler()"
end

- (self) pause - (self) pause(topicPartition) - (self) pause(topicPartitions) - (self) pause(topicPartition, completionHandler) { ... } - (self) pause(topicPartitions, completionHandler) { ... }

Suspend fetching from the requested partitions.

Due to internal buffering of messages, the will continue to observe messages from the given topicPartitions until some time after the given completionHandler is called. In contrast, the once the given completionHandler is called the #batch_handler will not see messages from the given topicPartitions.

Overloads:

  • - (self) pause(topicPartition)

    Parameters:

    • topicPartition (Hash)
      topic partition from which suspend fetching
  • - (self) pause(topicPartitions)

    Parameters:

    • topicPartitions (Set<Hash>)
      topic partition from which suspend fetching
  • - (self) pause(topicPartition, completionHandler) { ... }

    Parameters:

    • topicPartition (Hash)
      topic partition from which suspend fetching

    Yields:

    • handler called on operation completed
  • - (self) pause(topicPartitions, completionHandler) { ... }

    Parameters:

    • topicPartitions (Set<Hash>)
      topic partition from which suspend fetching

    Yields:

    • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 113

def pause(param_1=nil)
  if !block_given? && param_1 == nil
    @j_del.java_method(:pause, []).call()
    return self
  elsif param_1.class == Hash && !block_given?
    @j_del.java_method(:pause, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)))
    return self
  elsif param_1.class == Set && !block_given?
    @j_del.java_method(:pause, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }))
    return self
  elsif param_1.class == Hash && block_given?
    @j_del.java_method(:pause, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  elsif param_1.class == Set && block_given?
    @j_del.java_method(:pause, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling pause(#{param_1})"
end

- (void) paused { ... }

This method returns an undefined value.

Get the set of partitions that were previously paused by a call to pause(Set).

Yields:

  • handler called on operation completed

Raises:

  • (ArgumentError)


303
304
305
306
307
308
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 303

def paused
  if block_given?
    return @j_del.java_method(:paused, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ::Vertx::Util::Utils.to_set(ar.result).map! { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil } : nil) }))
  end
  raise ArgumentError, "Invalid arguments when calling paused()"
end

- (::Vertx::Pipe) pipe

Pause this stream and return a to transfer the elements of this stream to a destination .

The stream will be resumed when the pipe will be wired to a WriteStream.

Returns:

Raises:

  • (ArgumentError)


37
38
39
40
41
42
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 37

def pipe
  if !block_given?
    return ::Vertx::Util::Utils.safe_create(@j_del.java_method(:pipe, []).call(),::Vertx::Pipe,::VertxKafkaClient::KafkaConsumerRecord.j_api_type)
  end
  raise ArgumentError, "Invalid arguments when calling pipe()"
end

- (void) pipe_to(dst = nil) { ... }

This method returns an undefined value.

Pipe this ReadStream to the WriteStream.

Elements emitted by this stream will be written to the write stream until this stream ends or fails.

Once this stream has ended or failed, the write stream will be ended and the handler will be called with the result.

Parameters:

Yields:

Raises:

  • (ArgumentError)


52
53
54
55
56
57
58
59
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 52

def pipe_to(dst=nil)
  if dst.class.method_defined?(:j_del) && !block_given?
    return @j_del.java_method(:pipeTo, [Java::IoVertxCoreStreams::WriteStream.java_class]).call(dst.j_del)
  elsif dst.class.method_defined?(:j_del) && block_given?
    return @j_del.java_method(:pipeTo, [Java::IoVertxCoreStreams::WriteStream.java_class,Java::IoVertxCore::Handler.java_class]).call(dst.j_del,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
  end
  raise ArgumentError, "Invalid arguments when calling pipe_to(#{dst})"
end

- (void) poll(timeout = nil) { ... }

This method returns an undefined value.

Executes a poll for getting messages from Kafka

Parameters:

  • timeout (Fixnum) (defaults to: nil)
    The time, in milliseconds, spent waiting in poll if data is not available in the buffer. If 0, returns immediately with any records that are available currently in the native Kafka consumer's buffer, else returns empty. Must not be negative.

Yields:

  • handler called after the poll with batch of records (can be empty).

Raises:

  • (ArgumentError)


537
538
539
540
541
542
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 537

def poll(timeout=nil)
  if timeout.class == Fixnum && block_given?
    return @j_del.java_method(:poll, [Java::long.java_class,Java::IoVertxCore::Handler.java_class]).call(timeout,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ::Vertx::Util::Utils.safe_create(ar.result,::VertxKafkaClient::KafkaConsumerRecords, nil, nil) : nil) }))
  end
  raise ArgumentError, "Invalid arguments when calling poll(#{timeout})"
end

- (self) poll_timeout(timeout = nil)

Sets the poll timeout (in ms) for the underlying native Kafka Consumer. Defaults to 1000.

Parameters:

  • timeout (Fixnum) (defaults to: nil)

Returns:

  • (self)

Raises:

  • (ArgumentError)


526
527
528
529
530
531
532
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 526

def poll_timeout(timeout=nil)
  if timeout.class == Fixnum && !block_given?
    @j_del.java_method(:pollTimeout, [Java::long.java_class]).call(timeout)
    return self
  end
  raise ArgumentError, "Invalid arguments when calling poll_timeout(#{timeout})"
end

- (void) position(partition = nil) { ... }

This method returns an undefined value.

Get the offset of the next record that will be fetched (if a record with that offset exists).

Parameters:

  • partition (Hash) (defaults to: nil)
    The partition to get the position for

Yields:

  • handler called on operation completed

Raises:

  • (ArgumentError)


484
485
486
487
488
489
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 484

def position(partition=nil)
  if partition.class == Hash && block_given?
    return @j_del.java_method(:position, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(partition)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result : nil) }))
  end
  raise ArgumentError, "Invalid arguments when calling position(#{partition})"
end

- (self) resume - (self) resume(topicPartition) - (self) resume(topicPartitions) - (self) resume(topicPartition, completionHandler) { ... } - (self) resume(topicPartitions, completionHandler) { ... }

Resume specified partitions which have been paused with pause.

Overloads:

  • - (self) resume(topicPartition)

    Parameters:

    • topicPartition (Hash)
      topic partition from which resume fetching
  • - (self) resume(topicPartitions)

    Parameters:

    • topicPartitions (Set<Hash>)
      topic partition from which resume fetching
  • - (self) resume(topicPartition, completionHandler) { ... }

    Parameters:

    • topicPartition (Hash)
      topic partition from which resume fetching

    Yields:

    • handler called on operation completed
  • - (self) resume(topicPartitions, completionHandler) { ... }

    Parameters:

    • topicPartitions (Set<Hash>)
      topic partition from which resume fetching

    Yields:

    • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 145

def resume(param_1=nil)
  if !block_given? && param_1 == nil
    @j_del.java_method(:resume, []).call()
    return self
  elsif param_1.class == Hash && !block_given?
    @j_del.java_method(:resume, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)))
    return self
  elsif param_1.class == Set && !block_given?
    @j_del.java_method(:resume, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }))
    return self
  elsif param_1.class == Hash && block_given?
    @j_del.java_method(:resume, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  elsif param_1.class == Set && block_given?
    @j_del.java_method(:resume, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling resume(#{param_1})"
end

- (self) seek(topicPartition = nil, offset = nil) { ... }

Overrides the fetch offsets that the consumer will use on the next poll.

Due to internal buffering of messages, the will continue to observe messages fetched with respect to the old offset until some time after the given completionHandler is called. In contrast, the once the given completionHandler is called the #batch_handler will only see messages consistent with the new offset.

Parameters:

  • topicPartition (Hash) (defaults to: nil)
    topic partition for which seek
  • offset (Fixnum) (defaults to: nil)
    offset to seek inside the topic partition

Yields:

  • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


342
343
344
345
346
347
348
349
350
351
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 342

def seek(topicPartition=nil,offset=nil)
  if topicPartition.class == Hash && offset.class == Fixnum && !block_given?
    @j_del.java_method(:seek, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::long.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),offset)
    return self
  elsif topicPartition.class == Hash && offset.class == Fixnum && block_given?
    @j_del.java_method(:seek, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::long.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),offset,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling seek(#{topicPartition},#{offset})"
end

- (self) seekToBeginning(topicPartition) - (self) seekToBeginning(topicPartitions) - (self) seekToBeginning(topicPartition, completionHandler) { ... } - (self) seekToBeginning(topicPartitions, completionHandler) { ... }

Seek to the first offset for each of the given partitions.

Due to internal buffering of messages, the will continue to observe messages fetched with respect to the old offset until some time after the given completionHandler is called. In contrast, the once the given completionHandler is called the #batch_handler will only see messages consistent with the new offset.

Overloads:

  • - (self) seekToBeginning(topicPartition)

    Parameters:

    • topicPartition (Hash)
      topic partition for which seek
  • - (self) seekToBeginning(topicPartitions)

    Parameters:

    • topicPartitions (Set<Hash>)
      topic partition for which seek
  • - (self) seekToBeginning(topicPartition, completionHandler) { ... }

    Parameters:

    • topicPartition (Hash)
      topic partition for which seek

    Yields:

    • handler called on operation completed
  • - (self) seekToBeginning(topicPartitions, completionHandler) { ... }

    Parameters:

    • topicPartitions (Set<Hash>)
      topic partition for which seek

    Yields:

    • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 372

def seek_to_beginning(param_1=nil)
  if param_1.class == Hash && !block_given?
    @j_del.java_method(:seekToBeginning, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)))
    return self
  elsif param_1.class == Set && !block_given?
    @j_del.java_method(:seekToBeginning, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }))
    return self
  elsif param_1.class == Hash && block_given?
    @j_del.java_method(:seekToBeginning, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  elsif param_1.class == Set && block_given?
    @j_del.java_method(:seekToBeginning, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling seek_to_beginning(#{param_1})"
end

- (self) seekToEnd(topicPartition) - (self) seekToEnd(topicPartitions) - (self) seekToEnd(topicPartition, completionHandler) { ... } - (self) seekToEnd(topicPartitions, completionHandler) { ... }

Seek to the last offset for each of the given partitions.

Due to internal buffering of messages, the will continue to observe messages fetched with respect to the old offset until some time after the given completionHandler is called. In contrast, the once the given completionHandler is called the #batch_handler will only see messages consistent with the new offset.

Overloads:

  • - (self) seekToEnd(topicPartition)

    Parameters:

    • topicPartition (Hash)
      topic partition for which seek
  • - (self) seekToEnd(topicPartitions)

    Parameters:

    • topicPartitions (Set<Hash>)
      topic partition for which seek
  • - (self) seekToEnd(topicPartition, completionHandler) { ... }

    Parameters:

    • topicPartition (Hash)
      topic partition for which seek

    Yields:

    • handler called on operation completed
  • - (self) seekToEnd(topicPartitions, completionHandler) { ... }

    Parameters:

    • topicPartitions (Set<Hash>)
      topic partition for which seek

    Yields:

    • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 408

def seek_to_end(param_1=nil)
  if param_1.class == Hash && !block_given?
    @j_del.java_method(:seekToEnd, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)))
    return self
  elsif param_1.class == Set && !block_given?
    @j_del.java_method(:seekToEnd, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }))
    return self
  elsif param_1.class == Hash && block_given?
    @j_del.java_method(:seekToEnd, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  elsif param_1.class == Set && block_given?
    @j_del.java_method(:seekToEnd, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling seek_to_end(#{param_1})"
end

- (self) subscribe(topic) - (self) subscribe(topics) - (self) subscribe(topic, completionHandler) { ... } - (self) subscribe(topics, completionHandler) { ... }

Subscribe to the given list of topics to get dynamically assigned partitions.

Due to internal buffering of messages, when changing the subscribed topics the old set of topics may remain in effect (as observed by the record handler}) until some time after the given completionHandler is called. In contrast, the once the given completionHandler is called the #batch_handler will only see messages consistent with the new set of topics.

Overloads:

  • - (self) subscribe(topic)

    Parameters:

    • topic (String)
      topic to subscribe to
  • - (self) subscribe(topics)

    Parameters:

    • topics (Set<String>)
      topics to subscribe to
  • - (self) subscribe(topic, completionHandler) { ... }

    Parameters:

    • topic (String)
      topic to subscribe to

    Yields:

    • handler called on operation completed
  • - (self) subscribe(topics, completionHandler) { ... }

    Parameters:

    • topics (Set<String>)
      topics to subscribe to

    Yields:

    • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 215

def subscribe(param_1=nil)
  if param_1.class == String && !block_given?
    @j_del.java_method(:subscribe, [Java::java.lang.String.java_class]).call(param_1)
    return self
  elsif param_1.class == Set && !block_given?
    @j_del.java_method(:subscribe, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| element }))
    return self
  elsif param_1.class == String && block_given?
    @j_del.java_method(:subscribe, [Java::java.lang.String.java_class,Java::IoVertxCore::Handler.java_class]).call(param_1,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  elsif param_1.class == Set && block_given?
    @j_del.java_method(:subscribe, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| element }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling subscribe(#{param_1})"
end

- (self) subscription { ... }

Get the current subscription.

Yields:

  • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


293
294
295
296
297
298
299
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 293

def subscription
  if block_given?
    @j_del.java_method(:subscription, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ::Vertx::Util::Utils.to_set(ar.result).map! { |elt| elt } : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling subscription()"
end

- (self) unsubscribe { ... }

Unsubscribe from topics currently subscribed with subscribe.

Yields:

  • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


280
281
282
283
284
285
286
287
288
289
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 280

def unsubscribe
  if !block_given?
    @j_del.java_method(:unsubscribe, []).call()
    return self
  elsif block_given?
    @j_del.java_method(:unsubscribe, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling unsubscribe()"
end