Class: VertxKafkaClient::KafkaConsumer
- Inherits:
-
Object
- Object
- VertxKafkaClient::KafkaConsumer
- Includes:
- Vertx::ReadStream
- Defined in:
- /Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb
Overview
You receive Kafka records by providing a #handler. As messages arrive the handler will be called with the records.
The #pause and #resume provides global control over reading the records from the consumer.
The #pause and #resume provides finer grained control over reading records for specific Topic/Partition, these are Kafka's specific operations.
Class Method Summary (collapse)
-
+ (::VertxKafkaClient::KafkaConsumer) create(vertx = nil, config = nil, keyType = nil, valueType = nil)
Create a new KafkaConsumer instance.
Instance Method Summary (collapse)
-
- (self) assign(param_1 = nil)
Manually assign a list of partition to this consumer.
-
- (self) assignment { ... }
Get the set of partitions currently assigned to this consumer.
-
- (self) batch_handler { ... }
Set the handler to be used when batches of messages are fetched from the Kafka server.
-
- (void) beginning_offsets(topicPartition = nil) { ... }
Get the first offset for the given partitions.
-
- (void) close { ... }
Close the consumer.
-
- (void) commit { ... }
Commit current offsets for all the subscribed list of topics and partition.
-
- (void) committed(topicPartition = nil) { ... }
Get the last committed offset for the given partition (whether the commit happened by this process or another).
-
- (Fixnum) demand
Returns the current demand.
- - (self) end_handler { ... }
-
- (void) end_offsets(topicPartition = nil) { ... }
Get the last offset for the given partition.
- - (self) exception_handler { ... }
- - (self) fetch(amount = nil)
- - (self) handler { ... }
-
- (void) offsets_for_times(topicPartition = nil, timestamp = nil) { ... }
Look up the offset for the given partition by timestamp.
-
- (self) partitions_assigned_handler { ... }
Set the handler called when topic partitions are assigned to the consumer.
-
- (self) partitions_for(topic = nil) { ... }
Get metadata about the partitions for a given topic.
-
- (self) partitions_revoked_handler { ... }
Set the handler called when topic partitions are revoked to the consumer.
-
- (self) pause(param_1 = nil)
Suspend fetching from the requested partitions.
-
- (void) paused { ... }
Get the set of partitions that were previously paused by a call to pause(Set).
-
- (::Vertx::Pipe) pipe
Pause this stream and return a to transfer the elements of this stream to a destination .
-
- (void) pipe_to(dst = nil) { ... }
Pipe this ReadStream to the WriteStream.
-
- (void) poll(timeout = nil) { ... }
Executes a poll for getting messages from Kafka.
-
- (self) poll_timeout(timeout = nil)
Sets the poll timeout (in ms) for the underlying native Kafka Consumer.
-
- (void) position(partition = nil) { ... }
Get the offset of the next record that will be fetched (if a record with that offset exists).
-
- (self) resume(param_1 = nil)
Resume specified partitions which have been paused with pause.
-
- (self) seek(topicPartition = nil, offset = nil) { ... }
Overrides the fetch offsets that the consumer will use on the next poll.
-
- (self) seek_to_beginning(param_1 = nil)
Seek to the first offset for each of the given partitions.
-
- (self) seek_to_end(param_1 = nil)
Seek to the last offset for each of the given partitions.
-
- (self) subscribe(param_1 = nil)
Subscribe to the given list of topics to get dynamically assigned partitions.
-
- (self) subscription { ... }
Get the current subscription.
-
- (self) unsubscribe { ... }
Unsubscribe from topics currently subscribed with subscribe.
Class Method Details
+ (::VertxKafkaClient::KafkaConsumer) create(vertx = nil, config = nil, keyType = nil, valueType = nil)
66 67 68 69 70 71 72 73 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 66 def self.create(vertx=nil,config=nil,keyType=nil,valueType=nil) if vertx.class.method_defined?(:j_del) && config.class == Hash && !block_given? && keyType == nil && valueType == nil return ::Vertx::Util::Utils.safe_create(Java::IoVertxKafkaClientConsumer::KafkaConsumer.java_method(:create, [Java::IoVertxCore::Vertx.java_class,Java::JavaUtil::Map.java_class]).call(vertx.j_del,Hash[config.map { |k,v| [k,v] }]),::VertxKafkaClient::KafkaConsumer, nil, nil) elsif vertx.class.method_defined?(:j_del) && config.class == Hash && keyType.class == Class && valueType.class == Class && !block_given? return ::Vertx::Util::Utils.safe_create(Java::IoVertxKafkaClientConsumer::KafkaConsumer.java_method(:create, [Java::IoVertxCore::Vertx.java_class,Java::JavaUtil::Map.java_class,Java::JavaLang::Class.java_class,Java::JavaLang::Class.java_class]).call(vertx.j_del,Hash[config.map { |k,v| [k,v] }],::Vertx::Util::Utils.j_class_of(keyType),::Vertx::Util::Utils.j_class_of(valueType)),::VertxKafkaClient::KafkaConsumer, ::Vertx::Util::Utils.v_type_of(keyType), ::Vertx::Util::Utils.v_type_of(valueType)) end raise ArgumentError, "Invalid arguments when calling create(#{vertx},#{config},#{keyType},#{valueType})" end |
Instance Method Details
- (self) assign(topicPartition) - (self) assign(topicPartitions) - (self) assign(topicPartition, completionHandler) { ... } - (self) assign(topicPartitions, completionHandler) { ... }
Due to internal buffering of messages, when reassigning
the old set of partitions may remain in effect
(as observed by the record handler)}
until some time after the given completionHandler
is called. In contrast, the once the given completionHandler
is called the #batch_handler will only see messages
consistent with the new set of partitions.
251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 251 def assign(param_1=nil) if param_1.class == Hash && !block_given? @j_del.java_method(:assign, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1))) return self elsif param_1.class == Set && !block_given? @j_del.java_method(:assign, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) })) return self elsif param_1.class == Hash && block_given? @j_del.java_method(:assign, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self elsif param_1.class == Set && block_given? @j_del.java_method(:assign, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self end raise ArgumentError, "Invalid arguments when calling assign(#{param_1})" end |
- (self) assignment { ... }
270 271 272 273 274 275 276 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 270 def assignment if block_given? @j_del.java_method(:assignment, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ::Vertx::Util::Utils.to_set(ar.result).map! { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil } : nil) })) return self end raise ArgumentError, "Invalid arguments when calling assignment()" end |
- (self) batch_handler { ... }
462 463 464 465 466 467 468 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 462 def batch_handler if block_given? @j_del.java_method(:batchHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.safe_create(event,::VertxKafkaClient::KafkaConsumerRecords, nil, nil)) })) return self end raise ArgumentError, "Invalid arguments when calling batch_handler()" end |
- (void) beginning_offsets(topicPartition = nil) { ... }
This method returns an undefined value.
Get the first offset for the given partitions.
506 507 508 509 510 511 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 506 def beginning_offsets(topicPartition=nil) if topicPartition.class == Hash && block_given? return @j_del.java_method(:beginningOffsets, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result : nil) })) end raise ArgumentError, "Invalid arguments when calling beginning_offsets(#{topicPartition})" end |
- (void) close { ... }
This method returns an undefined value.
Close the consumer
472 473 474 475 476 477 478 479 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 472 def close if !block_given? return @j_del.java_method(:close, []).call() elsif block_given? return @j_del.java_method(:close, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) end raise ArgumentError, "Invalid arguments when calling close()" end |
- (void) commit { ... }
This method returns an undefined value.
Commit current offsets for all the subscribed list of topics and partition.
427 428 429 430 431 432 433 434 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 427 def commit if !block_given? return @j_del.java_method(:commit, []).call() elsif block_given? return @j_del.java_method(:commit, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) end raise ArgumentError, "Invalid arguments when calling commit()" end |
- (void) committed(topicPartition = nil) { ... }
This method returns an undefined value.
Get the last committed offset for the given partition (whether the commit happened by this process or another).
439 440 441 442 443 444 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 439 def committed(topicPartition=nil) if topicPartition.class == Hash && block_given? return @j_del.java_method(:committed, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result != nil ? JSON.parse(ar.result.toJson.encode) : nil : nil) })) end raise ArgumentError, "Invalid arguments when calling committed(#{topicPartition})" end |
- (Fixnum) demand
-
If the stream is in flowing mode will return MAX_VALUE.
- If the stream is in fetch mode, will return the current number of elements still to be delivered or 0 if paused.
189 190 191 192 193 194 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 189 def demand if !block_given? return @j_del.java_method(:demand, []).call() end raise ArgumentError, "Invalid arguments when calling demand()" end |
- (self) end_handler { ... }
166 167 168 169 170 171 172 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 166 def end_handler if block_given? @j_del.java_method(:endHandler, [Java::IoVertxCore::Handler.java_class]).call(Proc.new { yield }) return self end raise ArgumentError, "Invalid arguments when calling end_handler()" end |
- (void) end_offsets(topicPartition = nil) { ... }
This method returns an undefined value.
Get the last offset for the given partition. The last offset of a partition is the offset of the upcoming message, i.e. the offset of the last available message + 1.
517 518 519 520 521 522 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 517 def end_offsets(topicPartition=nil) if topicPartition.class == Hash && block_given? return @j_del.java_method(:endOffsets, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result : nil) })) end raise ArgumentError, "Invalid arguments when calling end_offsets(#{topicPartition})" end |
- (self) exception_handler { ... }
76 77 78 79 80 81 82 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 76 def exception_handler if block_given? @j_del.java_method(:exceptionHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.from_throwable(event)) })) return self end raise ArgumentError, "Invalid arguments when calling exception_handler()" end |
- (self) fetch(amount = nil)
175 176 177 178 179 180 181 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 175 def fetch(amount=nil) if amount.class == Fixnum && !block_given? @j_del.java_method(:fetch, [Java::long.java_class]).call(amount) return self end raise ArgumentError, "Invalid arguments when calling fetch(#{amount})" end |
- (self) handler { ... }
85 86 87 88 89 90 91 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 85 def handler if block_given? @j_del.java_method(:handler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.safe_create(event,::VertxKafkaClient::KafkaConsumerRecord, nil, nil)) })) return self end raise ArgumentError, "Invalid arguments when calling handler()" end |
- (void) offsets_for_times(topicPartition = nil, timestamp = nil) { ... }
This method returns an undefined value.
Look up the offset for the given partition by timestamp. Note: the result might be null in case for the given timestamp no offset can be found -- e.g., when the timestamp refers to the future
496 497 498 499 500 501 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 496 def offsets_for_times(topicPartition=nil,=nil) if topicPartition.class == Hash && .class == Fixnum && block_given? return @j_del.java_method(:offsetsForTimes, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::JavaLang::Long.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result != nil ? JSON.parse(ar.result.toJson.encode) : nil : nil) })) end raise ArgumentError, "Invalid arguments when calling offsets_for_times(#{topicPartition},#{})" end |
- (self) partitions_assigned_handler { ... }
322 323 324 325 326 327 328 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 322 def partitions_assigned_handler if block_given? @j_del.java_method(:partitionsAssignedHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.to_set(event).map! { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil }) })) return self end raise ArgumentError, "Invalid arguments when calling partitions_assigned_handler()" end |
- (self) partitions_for(topic = nil) { ... }
449 450 451 452 453 454 455 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 449 def partitions_for(topic=nil) if topic.class == String && block_given? @j_del.java_method(:partitionsFor, [Java::java.lang.String.java_class,Java::IoVertxCore::Handler.java_class]).call(topic,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result.to_a.map { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil } : nil) })) return self end raise ArgumentError, "Invalid arguments when calling partitions_for(#{topic})" end |
- (self) partitions_revoked_handler { ... }
312 313 314 315 316 317 318 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 312 def partitions_revoked_handler if block_given? @j_del.java_method(:partitionsRevokedHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.to_set(event).map! { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil }) })) return self end raise ArgumentError, "Invalid arguments when calling partitions_revoked_handler()" end |
- (self) pause - (self) pause(topicPartition) - (self) pause(topicPartitions) - (self) pause(topicPartition, completionHandler) { ... } - (self) pause(topicPartitions, completionHandler) { ... }
Due to internal buffering of messages,
the will
continue to observe messages from the given topicPartitions
until some time after the given completionHandler
is called. In contrast, the once the given completionHandler
is called the #batch_handler will not see messages
from the given topicPartitions
.
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 113 def pause(param_1=nil) if !block_given? && param_1 == nil @j_del.java_method(:pause, []).call() return self elsif param_1.class == Hash && !block_given? @j_del.java_method(:pause, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1))) return self elsif param_1.class == Set && !block_given? @j_del.java_method(:pause, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) })) return self elsif param_1.class == Hash && block_given? @j_del.java_method(:pause, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self elsif param_1.class == Set && block_given? @j_del.java_method(:pause, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self end raise ArgumentError, "Invalid arguments when calling pause(#{param_1})" end |
- (void) paused { ... }
This method returns an undefined value.
Get the set of partitions that were previously paused by a call to pause(Set).
303 304 305 306 307 308 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 303 def paused if block_given? return @j_del.java_method(:paused, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ::Vertx::Util::Utils.to_set(ar.result).map! { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil } : nil) })) end raise ArgumentError, "Invalid arguments when calling paused()" end |
- (::Vertx::Pipe) pipe
WriteStream
.
37 38 39 40 41 42 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 37 def pipe if !block_given? return ::Vertx::Util::Utils.safe_create(@j_del.java_method(:pipe, []).call(),::Vertx::Pipe,::VertxKafkaClient::KafkaConsumerRecord.j_api_type) end raise ArgumentError, "Invalid arguments when calling pipe()" end |
- (void) pipe_to(dst = nil) { ... }
This method returns an undefined value.
Pipe thisReadStream
to the WriteStream
.
Elements emitted by this stream will be written to the write stream until this stream ends or fails.
Once this stream has ended or failed, the write stream will be ended and the handler
will be
called with the result.
52 53 54 55 56 57 58 59 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 52 def pipe_to(dst=nil) if dst.class.method_defined?(:j_del) && !block_given? return @j_del.java_method(:pipeTo, [Java::IoVertxCoreStreams::WriteStream.java_class]).call(dst.j_del) elsif dst.class.method_defined?(:j_del) && block_given? return @j_del.java_method(:pipeTo, [Java::IoVertxCoreStreams::WriteStream.java_class,Java::IoVertxCore::Handler.java_class]).call(dst.j_del,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) end raise ArgumentError, "Invalid arguments when calling pipe_to(#{dst})" end |
- (void) poll(timeout = nil) { ... }
This method returns an undefined value.
Executes a poll for getting messages from Kafka
537 538 539 540 541 542 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 537 def poll(timeout=nil) if timeout.class == Fixnum && block_given? return @j_del.java_method(:poll, [Java::long.java_class,Java::IoVertxCore::Handler.java_class]).call(timeout,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ::Vertx::Util::Utils.safe_create(ar.result,::VertxKafkaClient::KafkaConsumerRecords, nil, nil) : nil) })) end raise ArgumentError, "Invalid arguments when calling poll(#{timeout})" end |
- (self) poll_timeout(timeout = nil)
526 527 528 529 530 531 532 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 526 def poll_timeout(timeout=nil) if timeout.class == Fixnum && !block_given? @j_del.java_method(:pollTimeout, [Java::long.java_class]).call(timeout) return self end raise ArgumentError, "Invalid arguments when calling poll_timeout(#{timeout})" end |
- (void) position(partition = nil) { ... }
This method returns an undefined value.
Get the offset of the next record that will be fetched (if a record with that offset exists).
484 485 486 487 488 489 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 484 def position(partition=nil) if partition.class == Hash && block_given? return @j_del.java_method(:position, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(partition)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result : nil) })) end raise ArgumentError, "Invalid arguments when calling position(#{partition})" end |
- (self) resume - (self) resume(topicPartition) - (self) resume(topicPartitions) - (self) resume(topicPartition, completionHandler) { ... } - (self) resume(topicPartitions, completionHandler) { ... }
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 145 def resume(param_1=nil) if !block_given? && param_1 == nil @j_del.java_method(:resume, []).call() return self elsif param_1.class == Hash && !block_given? @j_del.java_method(:resume, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1))) return self elsif param_1.class == Set && !block_given? @j_del.java_method(:resume, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) })) return self elsif param_1.class == Hash && block_given? @j_del.java_method(:resume, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self elsif param_1.class == Set && block_given? @j_del.java_method(:resume, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self end raise ArgumentError, "Invalid arguments when calling resume(#{param_1})" end |
- (self) seek(topicPartition = nil, offset = nil) { ... }
Due to internal buffering of messages,
the will
continue to observe messages fetched with respect to the old offset
until some time after the given completionHandler
is called. In contrast, the once the given completionHandler
is called the #batch_handler will only see messages
consistent with the new offset.
342 343 344 345 346 347 348 349 350 351 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 342 def seek(topicPartition=nil,offset=nil) if topicPartition.class == Hash && offset.class == Fixnum && !block_given? @j_del.java_method(:seek, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::long.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),offset) return self elsif topicPartition.class == Hash && offset.class == Fixnum && block_given? @j_del.java_method(:seek, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::long.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),offset,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self end raise ArgumentError, "Invalid arguments when calling seek(#{topicPartition},#{offset})" end |
- (self) seekToBeginning(topicPartition) - (self) seekToBeginning(topicPartitions) - (self) seekToBeginning(topicPartition, completionHandler) { ... } - (self) seekToBeginning(topicPartitions, completionHandler) { ... }
Due to internal buffering of messages,
the will
continue to observe messages fetched with respect to the old offset
until some time after the given completionHandler
is called. In contrast, the once the given completionHandler
is called the #batch_handler will only see messages
consistent with the new offset.
372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 372 def seek_to_beginning(param_1=nil) if param_1.class == Hash && !block_given? @j_del.java_method(:seekToBeginning, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1))) return self elsif param_1.class == Set && !block_given? @j_del.java_method(:seekToBeginning, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) })) return self elsif param_1.class == Hash && block_given? @j_del.java_method(:seekToBeginning, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self elsif param_1.class == Set && block_given? @j_del.java_method(:seekToBeginning, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self end raise ArgumentError, "Invalid arguments when calling seek_to_beginning(#{param_1})" end |
- (self) seekToEnd(topicPartition) - (self) seekToEnd(topicPartitions) - (self) seekToEnd(topicPartition, completionHandler) { ... } - (self) seekToEnd(topicPartitions, completionHandler) { ... }
Due to internal buffering of messages,
the will
continue to observe messages fetched with respect to the old offset
until some time after the given completionHandler
is called. In contrast, the once the given completionHandler
is called the #batch_handler will only see messages
consistent with the new offset.
408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 408 def seek_to_end(param_1=nil) if param_1.class == Hash && !block_given? @j_del.java_method(:seekToEnd, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1))) return self elsif param_1.class == Set && !block_given? @j_del.java_method(:seekToEnd, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) })) return self elsif param_1.class == Hash && block_given? @j_del.java_method(:seekToEnd, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self elsif param_1.class == Set && block_given? @j_del.java_method(:seekToEnd, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self end raise ArgumentError, "Invalid arguments when calling seek_to_end(#{param_1})" end |
- (self) subscribe(topic) - (self) subscribe(topics) - (self) subscribe(topic, completionHandler) { ... } - (self) subscribe(topics, completionHandler) { ... }
Due to internal buffering of messages, when changing the subscribed topics
the old set of topics may remain in effect
(as observed by the record handler})
until some time after the given completionHandler
is called. In contrast, the once the given completionHandler
is called the #batch_handler will only see messages
consistent with the new set of topics.
215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 215 def subscribe(param_1=nil) if param_1.class == String && !block_given? @j_del.java_method(:subscribe, [Java::java.lang.String.java_class]).call(param_1) return self elsif param_1.class == Set && !block_given? @j_del.java_method(:subscribe, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| element })) return self elsif param_1.class == String && block_given? @j_del.java_method(:subscribe, [Java::java.lang.String.java_class,Java::IoVertxCore::Handler.java_class]).call(param_1,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self elsif param_1.class == Set && block_given? @j_del.java_method(:subscribe, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| element }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self end raise ArgumentError, "Invalid arguments when calling subscribe(#{param_1})" end |
- (self) subscription { ... }
293 294 295 296 297 298 299 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 293 def subscription if block_given? @j_del.java_method(:subscription, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ::Vertx::Util::Utils.to_set(ar.result).map! { |elt| elt } : nil) })) return self end raise ArgumentError, "Invalid arguments when calling subscription()" end |
- (self) unsubscribe { ... }
280 281 282 283 284 285 286 287 288 289 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 280 def unsubscribe if !block_given? @j_del.java_method(:unsubscribe, []).call() return self elsif block_given? @j_del.java_method(:unsubscribe, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self end raise ArgumentError, "Invalid arguments when calling unsubscribe()" end |