Class: VertxKafkaClient::KafkaConsumer
- Inherits:
-
Object
- Object
- VertxKafkaClient::KafkaConsumer
- Includes:
- Vertx::ReadStream
- Defined in:
- /Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb
Overview
You receive Kafka records by providing a #handler. As messages arrive the handler will be called with the records.
The #pause and #resume provides global control over reading the records from the consumer.
The #pause and #resume provides finer grained control over reading records for specific Topic/Partition, these are Kafka's specific operations.
Class Method Summary (collapse)
-
+ (::VertxKafkaClient::KafkaConsumer) create(vertx = nil, config = nil, keyType = nil, valueType = nil)
Create a new KafkaConsumer instance.
Instance Method Summary (collapse)
-
- (self) assign(param_1 = nil)
Manually assign a list of partition to this consumer.
-
- (self) assignment { ... }
Get the set of partitions currently assigned to this consumer.
-
- (self) batch_handler { ... }
Set the handler to be used when batches of messages are fetched from the Kafka server.
-
- (void) beginning_offsets(topicPartition = nil) { ... }
Get the first offset for the given partitions.
-
- (void) close { ... }
Close the consumer.
-
- (void) commit { ... }
Commit current offsets for all the subscribed list of topics and partition.
-
- (void) committed(topicPartition = nil) { ... }
Get the last committed offset for the given partition (whether the commit happened by this process or another).
- - (self) end_handler { ... }
-
- (void) end_offsets(topicPartition = nil) { ... }
Get the last offset for the given partition.
- - (self) exception_handler { ... }
- - (self) fetch(amount = nil)
- - (self) handler { ... }
-
- (void) offsets_for_times(topicPartition = nil, timestamp = nil) { ... }
Look up the offset for the given partition by timestamp.
-
- (self) partitions_assigned_handler { ... }
Set the handler called when topic partitions are assigned to the consumer.
-
- (self) partitions_for(topic = nil) { ... }
Get metadata about the partitions for a given topic.
-
- (self) partitions_revoked_handler { ... }
Set the handler called when topic partitions are revoked to the consumer.
-
- (self) pause(param_1 = nil)
Suspend fetching from the requested partitions.
-
- (void) paused { ... }
Get the set of partitions that were previously paused by a call to pause(Set).
-
- (::Vertx::Pipe) pipe
Pause this stream and return a to transfer the elements of this stream to a destination .
-
- (void) pipe_to(dst = nil) { ... }
Pipe this ReadStream to the WriteStream.
-
- (void) poll(timeout = nil) { ... }
Executes a poll for getting messages from Kafka.
-
- (self) poll_timeout(timeout = nil)
Sets the poll timeout (in ms) for the underlying native Kafka Consumer.
-
- (void) position(partition = nil) { ... }
Get the offset of the next record that will be fetched (if a record with that offset exists).
-
- (self) resume(param_1 = nil)
Resume specified partitions which have been paused with pause.
-
- (self) seek(topicPartition = nil, offset = nil) { ... }
Overrides the fetch offsets that the consumer will use on the next poll.
-
- (self) seek_to_beginning(param_1 = nil)
Seek to the first offset for each of the given partitions.
-
- (self) seek_to_end(param_1 = nil)
Seek to the last offset for each of the given partitions.
-
- (self) subscribe(param_1 = nil)
Subscribe to the given list of topics to get dynamically assigned partitions.
-
- (self) subscription { ... }
Get the current subscription.
-
- (self) unsubscribe { ... }
Unsubscribe from topics currently subscribed with subscribe.
Class Method Details
+ (::VertxKafkaClient::KafkaConsumer) create(vertx = nil, config = nil, keyType = nil, valueType = nil)
66 67 68 69 70 71 72 73 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 66 def self.create(vertx=nil,config=nil,keyType=nil,valueType=nil) if vertx.class.method_defined?(:j_del) && config.class == Hash && !block_given? && keyType == nil && valueType == nil return ::Vertx::Util::Utils.safe_create(Java::IoVertxKafkaClientConsumer::KafkaConsumer.java_method(:create, [Java::IoVertxCore::Vertx.java_class,Java::JavaUtil::Map.java_class]).call(vertx.j_del,Hash[config.map { |k,v| [k,v] }]),::VertxKafkaClient::KafkaConsumer, nil, nil) elsif vertx.class.method_defined?(:j_del) && config.class == Hash && keyType.class == Class && valueType.class == Class && !block_given? return ::Vertx::Util::Utils.safe_create(Java::IoVertxKafkaClientConsumer::KafkaConsumer.java_method(:create, [Java::IoVertxCore::Vertx.java_class,Java::JavaUtil::Map.java_class,Java::JavaLang::Class.java_class,Java::JavaLang::Class.java_class]).call(vertx.j_del,Hash[config.map { |k,v| [k,v] }],::Vertx::Util::Utils.j_class_of(keyType),::Vertx::Util::Utils.j_class_of(valueType)),::VertxKafkaClient::KafkaConsumer, ::Vertx::Util::Utils.v_type_of(keyType), ::Vertx::Util::Utils.v_type_of(valueType)) end raise ArgumentError, "Invalid arguments when calling create(#{vertx},#{config},#{keyType},#{valueType})" end |
Instance Method Details
- (self) assign(topicPartition) - (self) assign(topicPartitions) - (self) assign(topicPartition, completionHandler) { ... } - (self) assign(topicPartitions, completionHandler) { ... }
Due to internal buffering of messages, when reassigning
the old set of partitions may remain in effect
(as observed by the record handler)}
until some time after the given completionHandler
is called. In contrast, the once the given completionHandler
is called the #batch_handler will only see messages
consistent with the new set of partitions.
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 238 def assign(param_1=nil) if param_1.class == Hash && !block_given? @j_del.java_method(:assign, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1))) return self elsif param_1.class == Set && !block_given? @j_del.java_method(:assign, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) })) return self elsif param_1.class == Hash && block_given? @j_del.java_method(:assign, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self elsif param_1.class == Set && block_given? @j_del.java_method(:assign, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self end raise ArgumentError, "Invalid arguments when calling assign(#{param_1})" end |
- (self) assignment { ... }
257 258 259 260 261 262 263 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 257 def assignment if block_given? @j_del.java_method(:assignment, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ::Vertx::Util::Utils.to_set(ar.result).map! { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil } : nil) })) return self end raise ArgumentError, "Invalid arguments when calling assignment()" end |
- (self) batch_handler { ... }
449 450 451 452 453 454 455 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 449 def batch_handler if block_given? @j_del.java_method(:batchHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.safe_create(event,::VertxKafkaClient::KafkaConsumerRecords, nil, nil)) })) return self end raise ArgumentError, "Invalid arguments when calling batch_handler()" end |
- (void) beginning_offsets(topicPartition = nil) { ... }
This method returns an undefined value.
Get the first offset for the given partitions.
493 494 495 496 497 498 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 493 def beginning_offsets(topicPartition=nil) if topicPartition.class == Hash && block_given? return @j_del.java_method(:beginningOffsets, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result : nil) })) end raise ArgumentError, "Invalid arguments when calling beginning_offsets(#{topicPartition})" end |
- (void) close { ... }
This method returns an undefined value.
Close the consumer
459 460 461 462 463 464 465 466 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 459 def close if !block_given? return @j_del.java_method(:close, []).call() elsif block_given? return @j_del.java_method(:close, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) end raise ArgumentError, "Invalid arguments when calling close()" end |
- (void) commit { ... }
This method returns an undefined value.
Commit current offsets for all the subscribed list of topics and partition.
414 415 416 417 418 419 420 421 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 414 def commit if !block_given? return @j_del.java_method(:commit, []).call() elsif block_given? return @j_del.java_method(:commit, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) end raise ArgumentError, "Invalid arguments when calling commit()" end |
- (void) committed(topicPartition = nil) { ... }
This method returns an undefined value.
Get the last committed offset for the given partition (whether the commit happened by this process or another).
426 427 428 429 430 431 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 426 def committed(topicPartition=nil) if topicPartition.class == Hash && block_given? return @j_del.java_method(:committed, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result != nil ? JSON.parse(ar.result.toJson.encode) : nil : nil) })) end raise ArgumentError, "Invalid arguments when calling committed(#{topicPartition})" end |
- (self) end_handler { ... }
166 167 168 169 170 171 172 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 166 def end_handler if block_given? @j_del.java_method(:endHandler, [Java::IoVertxCore::Handler.java_class]).call(Proc.new { yield }) return self end raise ArgumentError, "Invalid arguments when calling end_handler()" end |
- (void) end_offsets(topicPartition = nil) { ... }
This method returns an undefined value.
Get the last offset for the given partition. The last offset of a partition is the offset of the upcoming message, i.e. the offset of the last available message + 1.
504 505 506 507 508 509 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 504 def end_offsets(topicPartition=nil) if topicPartition.class == Hash && block_given? return @j_del.java_method(:endOffsets, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result : nil) })) end raise ArgumentError, "Invalid arguments when calling end_offsets(#{topicPartition})" end |
- (self) exception_handler { ... }
76 77 78 79 80 81 82 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 76 def exception_handler if block_given? @j_del.java_method(:exceptionHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.from_throwable(event)) })) return self end raise ArgumentError, "Invalid arguments when calling exception_handler()" end |
- (self) fetch(amount = nil)
175 176 177 178 179 180 181 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 175 def fetch(amount=nil) if amount.class == Fixnum && !block_given? @j_del.java_method(:fetch, [Java::long.java_class]).call(amount) return self end raise ArgumentError, "Invalid arguments when calling fetch(#{amount})" end |
- (self) handler { ... }
85 86 87 88 89 90 91 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 85 def handler if block_given? @j_del.java_method(:handler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.safe_create(event,::VertxKafkaClient::KafkaConsumerRecord, nil, nil)) })) return self end raise ArgumentError, "Invalid arguments when calling handler()" end |
- (void) offsets_for_times(topicPartition = nil, timestamp = nil) { ... }
This method returns an undefined value.
Look up the offset for the given partition by timestamp. Note: the result might be null in case for the given timestamp no offset can be found -- e.g., when the timestamp refers to the future
483 484 485 486 487 488 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 483 def offsets_for_times(topicPartition=nil,=nil) if topicPartition.class == Hash && .class == Fixnum && block_given? return @j_del.java_method(:offsetsForTimes, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::JavaLang::Long.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result != nil ? JSON.parse(ar.result.toJson.encode) : nil : nil) })) end raise ArgumentError, "Invalid arguments when calling offsets_for_times(#{topicPartition},#{})" end |
- (self) partitions_assigned_handler { ... }
309 310 311 312 313 314 315 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 309 def partitions_assigned_handler if block_given? @j_del.java_method(:partitionsAssignedHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.to_set(event).map! { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil }) })) return self end raise ArgumentError, "Invalid arguments when calling partitions_assigned_handler()" end |
- (self) partitions_for(topic = nil) { ... }
436 437 438 439 440 441 442 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 436 def partitions_for(topic=nil) if topic.class == String && block_given? @j_del.java_method(:partitionsFor, [Java::java.lang.String.java_class,Java::IoVertxCore::Handler.java_class]).call(topic,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result.to_a.map { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil } : nil) })) return self end raise ArgumentError, "Invalid arguments when calling partitions_for(#{topic})" end |
- (self) partitions_revoked_handler { ... }
299 300 301 302 303 304 305 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 299 def partitions_revoked_handler if block_given? @j_del.java_method(:partitionsRevokedHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.to_set(event).map! { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil }) })) return self end raise ArgumentError, "Invalid arguments when calling partitions_revoked_handler()" end |
- (self) pause - (self) pause(topicPartition) - (self) pause(topicPartitions) - (self) pause(topicPartition, completionHandler) { ... } - (self) pause(topicPartitions, completionHandler) { ... }
Due to internal buffering of messages,
the will
continue to observe messages from the given topicPartitions
until some time after the given completionHandler
is called. In contrast, the once the given completionHandler
is called the #batch_handler will not see messages
from the given topicPartitions
.
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 113 def pause(param_1=nil) if !block_given? && param_1 == nil @j_del.java_method(:pause, []).call() return self elsif param_1.class == Hash && !block_given? @j_del.java_method(:pause, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1))) return self elsif param_1.class == Set && !block_given? @j_del.java_method(:pause, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) })) return self elsif param_1.class == Hash && block_given? @j_del.java_method(:pause, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self elsif param_1.class == Set && block_given? @j_del.java_method(:pause, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self end raise ArgumentError, "Invalid arguments when calling pause(#{param_1})" end |
- (void) paused { ... }
This method returns an undefined value.
Get the set of partitions that were previously paused by a call to pause(Set).
290 291 292 293 294 295 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 290 def paused if block_given? return @j_del.java_method(:paused, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ::Vertx::Util::Utils.to_set(ar.result).map! { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil } : nil) })) end raise ArgumentError, "Invalid arguments when calling paused()" end |
- (::Vertx::Pipe) pipe
WriteStream
.
37 38 39 40 41 42 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 37 def pipe if !block_given? return ::Vertx::Util::Utils.safe_create(@j_del.java_method(:pipe, []).call(),::Vertx::Pipe,::VertxKafkaClient::KafkaConsumerRecord.j_api_type) end raise ArgumentError, "Invalid arguments when calling pipe()" end |
- (void) pipe_to(dst = nil) { ... }
This method returns an undefined value.
Pipe thisReadStream
to the WriteStream
.
Elements emitted by this stream will be written to the write stream until this stream ends or fails.
Once this stream has ended or failed, the write stream will be ended and the handler
will be
called with the result.
52 53 54 55 56 57 58 59 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 52 def pipe_to(dst=nil) if dst.class.method_defined?(:j_del) && !block_given? return @j_del.java_method(:pipeTo, [Java::IoVertxCoreStreams::WriteStream.java_class]).call(dst.j_del) elsif dst.class.method_defined?(:j_del) && block_given? return @j_del.java_method(:pipeTo, [Java::IoVertxCoreStreams::WriteStream.java_class,Java::IoVertxCore::Handler.java_class]).call(dst.j_del,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) end raise ArgumentError, "Invalid arguments when calling pipe_to(#{dst})" end |
- (void) poll(timeout = nil) { ... }
This method returns an undefined value.
Executes a poll for getting messages from Kafka
527 528 529 530 531 532 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 527 def poll(timeout=nil) if timeout.class == Fixnum && block_given? return @j_del.java_method(:poll, [Java::long.java_class,Java::IoVertxCore::Handler.java_class]).call(timeout,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ::Vertx::Util::Utils.safe_create(ar.result,::VertxKafkaClient::KafkaConsumerRecords, nil, nil) : nil) })) end raise ArgumentError, "Invalid arguments when calling poll(#{timeout})" end |
- (self) poll_timeout(timeout = nil)
516 517 518 519 520 521 522 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 516 def poll_timeout(timeout=nil) if timeout.class == Fixnum && !block_given? @j_del.java_method(:pollTimeout, [Java::long.java_class]).call(timeout) return self end raise ArgumentError, "Invalid arguments when calling poll_timeout(#{timeout})" end |
- (void) position(partition = nil) { ... }
This method returns an undefined value.
Get the offset of the next record that will be fetched (if a record with that offset exists).
471 472 473 474 475 476 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 471 def position(partition=nil) if partition.class == Hash && block_given? return @j_del.java_method(:position, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(partition)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result : nil) })) end raise ArgumentError, "Invalid arguments when calling position(#{partition})" end |
- (self) resume - (self) resume(topicPartition) - (self) resume(topicPartitions) - (self) resume(topicPartition, completionHandler) { ... } - (self) resume(topicPartitions, completionHandler) { ... }
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 145 def resume(param_1=nil) if !block_given? && param_1 == nil @j_del.java_method(:resume, []).call() return self elsif param_1.class == Hash && !block_given? @j_del.java_method(:resume, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1))) return self elsif param_1.class == Set && !block_given? @j_del.java_method(:resume, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) })) return self elsif param_1.class == Hash && block_given? @j_del.java_method(:resume, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self elsif param_1.class == Set && block_given? @j_del.java_method(:resume, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self end raise ArgumentError, "Invalid arguments when calling resume(#{param_1})" end |
- (self) seek(topicPartition = nil, offset = nil) { ... }
Due to internal buffering of messages,
the will
continue to observe messages fetched with respect to the old offset
until some time after the given completionHandler
is called. In contrast, the once the given completionHandler
is called the #batch_handler will only see messages
consistent with the new offset.
329 330 331 332 333 334 335 336 337 338 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 329 def seek(topicPartition=nil,offset=nil) if topicPartition.class == Hash && offset.class == Fixnum && !block_given? @j_del.java_method(:seek, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::long.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),offset) return self elsif topicPartition.class == Hash && offset.class == Fixnum && block_given? @j_del.java_method(:seek, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::long.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),offset,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self end raise ArgumentError, "Invalid arguments when calling seek(#{topicPartition},#{offset})" end |
- (self) seekToBeginning(topicPartition) - (self) seekToBeginning(topicPartitions) - (self) seekToBeginning(topicPartition, completionHandler) { ... } - (self) seekToBeginning(topicPartitions, completionHandler) { ... }
Due to internal buffering of messages,
the will
continue to observe messages fetched with respect to the old offset
until some time after the given completionHandler
is called. In contrast, the once the given completionHandler
is called the #batch_handler will only see messages
consistent with the new offset.
359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 359 def seek_to_beginning(param_1=nil) if param_1.class == Hash && !block_given? @j_del.java_method(:seekToBeginning, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1))) return self elsif param_1.class == Set && !block_given? @j_del.java_method(:seekToBeginning, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) })) return self elsif param_1.class == Hash && block_given? @j_del.java_method(:seekToBeginning, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self elsif param_1.class == Set && block_given? @j_del.java_method(:seekToBeginning, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self end raise ArgumentError, "Invalid arguments when calling seek_to_beginning(#{param_1})" end |
- (self) seekToEnd(topicPartition) - (self) seekToEnd(topicPartitions) - (self) seekToEnd(topicPartition, completionHandler) { ... } - (self) seekToEnd(topicPartitions, completionHandler) { ... }
Due to internal buffering of messages,
the will
continue to observe messages fetched with respect to the old offset
until some time after the given completionHandler
is called. In contrast, the once the given completionHandler
is called the #batch_handler will only see messages
consistent with the new offset.
395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 395 def seek_to_end(param_1=nil) if param_1.class == Hash && !block_given? @j_del.java_method(:seekToEnd, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1))) return self elsif param_1.class == Set && !block_given? @j_del.java_method(:seekToEnd, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) })) return self elsif param_1.class == Hash && block_given? @j_del.java_method(:seekToEnd, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self elsif param_1.class == Set && block_given? @j_del.java_method(:seekToEnd, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self end raise ArgumentError, "Invalid arguments when calling seek_to_end(#{param_1})" end |
- (self) subscribe(topic) - (self) subscribe(topics) - (self) subscribe(topic, completionHandler) { ... } - (self) subscribe(topics, completionHandler) { ... }
Due to internal buffering of messages, when changing the subscribed topics
the old set of topics may remain in effect
(as observed by the record handler})
until some time after the given completionHandler
is called. In contrast, the once the given completionHandler
is called the #batch_handler will only see messages
consistent with the new set of topics.
202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 202 def subscribe(param_1=nil) if param_1.class == String && !block_given? @j_del.java_method(:subscribe, [Java::java.lang.String.java_class]).call(param_1) return self elsif param_1.class == Set && !block_given? @j_del.java_method(:subscribe, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| element })) return self elsif param_1.class == String && block_given? @j_del.java_method(:subscribe, [Java::java.lang.String.java_class,Java::IoVertxCore::Handler.java_class]).call(param_1,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self elsif param_1.class == Set && block_given? @j_del.java_method(:subscribe, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| element }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self end raise ArgumentError, "Invalid arguments when calling subscribe(#{param_1})" end |
- (self) subscription { ... }
280 281 282 283 284 285 286 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 280 def subscription if block_given? @j_del.java_method(:subscription, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ::Vertx::Util::Utils.to_set(ar.result).map! { |elt| elt } : nil) })) return self end raise ArgumentError, "Invalid arguments when calling subscription()" end |
- (self) unsubscribe { ... }
267 268 269 270 271 272 273 274 275 276 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 267 def unsubscribe if !block_given? @j_del.java_method(:unsubscribe, []).call() return self elsif block_given? @j_del.java_method(:unsubscribe, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self end raise ArgumentError, "Invalid arguments when calling unsubscribe()" end |