Class: VertxKafkaClient::KafkaConsumer
- Inherits:
-
Object
- Object
- VertxKafkaClient::KafkaConsumer
- Includes:
- Vertx::ReadStream
- Defined in:
- /Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb
Overview
Vert.x Kafka consumer.
You receive Kafka records by providing a #handler. As messages arrive the handler will be called with the records.
The #pause and #resume provides global control over reading the records from the consumer.
The #pause and #resume provides finer grained control over reading records for specific Topic/Partition, these are Kafka's specific operations.
Class Method Summary (collapse)
-
+ (::VertxKafkaClient::KafkaConsumer) create(vertx = nil, config = nil, keyType = nil, valueType = nil)
Create a new KafkaConsumer instance.
Instance Method Summary (collapse)
-
- (self) assign(param_1 = nil)
Manually assign a list of partition to this consumer.
-
- (self) assignment { ... }
Get the set of partitions currently assigned to this consumer.
-
- (self) batch_handler { ... }
Set the handler to be used when batches of messages are fetched from the Kafka server.
-
- (void) beginning_offsets(topicPartition = nil) { ... }
Get the first offset for the given partitions.
-
- (void) close { ... }
Close the consumer.
-
- (void) commit { ... }
Commit current offsets for all the subscribed list of topics and partition.
-
- (void) committed(topicPartition = nil) { ... }
Get the last committed offset for the given partition (whether the commit happened by this process or another).
- - (self) end_handler { ... }
-
- (void) end_offsets(topicPartition = nil) { ... }
Get the last offset for the given partition.
- - (self) exception_handler { ... }
- - (self) handler { ... }
-
- (void) offsets_for_times(topicPartition = nil, timestamp = nil) { ... }
Look up the offset for the given partition by timestamp.
-
- (self) partitions_assigned_handler { ... }
Set the handler called when topic partitions are assigned to the consumer.
-
- (self) partitions_for(topic = nil) { ... }
Get metadata about the partitions for a given topic.
-
- (self) partitions_revoked_handler { ... }
Set the handler called when topic partitions are revoked to the consumer.
-
- (self) pause(param_1 = nil)
Suspend fetching from the requested partitions.
-
- (void) paused { ... }
Get the set of partitions that were previously paused by a call to pause(Set).
-
- (void) position(partition = nil) { ... }
Get the offset of the next record that will be fetched (if a record with that offset exists).
-
- (self) resume(param_1 = nil)
Resume specified partitions which have been paused with pause.
-
- (self) seek(topicPartition = nil, offset = nil) { ... }
Overrides the fetch offsets that the consumer will use on the next poll.
-
- (self) seek_to_beginning(param_1 = nil)
Seek to the first offset for each of the given partitions.
-
- (self) seek_to_end(param_1 = nil)
Seek to the last offset for each of the given partitions.
-
- (self) subscribe(param_1 = nil)
Subscribe to the given list of topics to get dynamically assigned partitions.
-
- (self) subscription { ... }
Get the current subscription.
-
- (self) unsubscribe { ... }
Unsubscribe from topics currently subscribed with subscribe.
Class Method Details
+ (::VertxKafkaClient::KafkaConsumer) create(vertx = nil, config = nil, keyType = nil, valueType = nil)
Create a new KafkaConsumer instance
37 38 39 40 41 42 43 44 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 37 def self.create(vertx=nil,config=nil,keyType=nil,valueType=nil) if vertx.class.method_defined?(:j_del) && config.class == Hash && !block_given? && keyType == nil && valueType == nil return ::Vertx::Util::Utils.safe_create(Java::IoVertxKafkaClientConsumer::KafkaConsumer.java_method(:create, [Java::IoVertxCore::Vertx.java_class,Java::JavaUtil::Map.java_class]).call(vertx.j_del,Hash[config.map { |k,v| [k,v] }]),::VertxKafkaClient::KafkaConsumer, nil, nil) elsif vertx.class.method_defined?(:j_del) && config.class == Hash && keyType.class == Class && valueType.class == Class && !block_given? return ::Vertx::Util::Utils.safe_create(Java::IoVertxKafkaClientConsumer::KafkaConsumer.java_method(:create, [Java::IoVertxCore::Vertx.java_class,Java::JavaUtil::Map.java_class,Java::JavaLang::Class.java_class,Java::JavaLang::Class.java_class]).call(vertx.j_del,Hash[config.map { |k,v| [k,v] }],::Vertx::Util::Utils.j_class_of(keyType),::Vertx::Util::Utils.j_class_of(valueType)),::VertxKafkaClient::KafkaConsumer, ::Vertx::Util::Utils.v_type_of(keyType), ::Vertx::Util::Utils.v_type_of(valueType)) end raise ArgumentError, "Invalid arguments when calling create(#{vertx},#{config},#{keyType},#{valueType})" end |
Instance Method Details
- (self) assign(topicPartition) - (self) assign(topicPartitions) - (self) assign(topicPartition, completionHandler) { ... } - (self) assign(topicPartitions, completionHandler) { ... }
Manually assign a list of partition to this consumer.
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 176 def assign(param_1=nil) if param_1.class == Hash && !block_given? @j_del.java_method(:assign, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1))) return self elsif param_1.class == Set && !block_given? @j_del.java_method(:assign, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) })) return self elsif param_1.class == Hash && block_given? @j_del.java_method(:assign, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self elsif param_1.class == Set && block_given? @j_del.java_method(:assign, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self end raise ArgumentError, "Invalid arguments when calling assign(#{param_1})" end |
- (self) assignment { ... }
Get the set of partitions currently assigned to this consumer.
195 196 197 198 199 200 201 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 195 def assignment if block_given? @j_del.java_method(:assignment, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ::Vertx::Util::Utils.to_set(ar.result).map! { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil } : nil) })) return self end raise ArgumentError, "Invalid arguments when calling assignment()" end |
- (self) batch_handler { ... }
Set the handler to be used when batches of messages are fetched
from the Kafka server. Batch handlers need to take care not to block
the event loop when dealing with large batches. It is better to process
records individually using the record handler.
363 364 365 366 367 368 369 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 363 def batch_handler if block_given? @j_del.java_method(:batchHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.safe_create(event,::VertxKafkaClient::KafkaConsumerRecords, nil, nil)) })) return self end raise ArgumentError, "Invalid arguments when calling batch_handler()" end |
- (void) beginning_offsets(topicPartition = nil) { ... }
This method returns an undefined value.
Get the first offset for the given partitions.
407 408 409 410 411 412 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 407 def beginning_offsets(topicPartition=nil) if topicPartition.class == Hash && block_given? return @j_del.java_method(:beginningOffsets, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result : nil) })) end raise ArgumentError, "Invalid arguments when calling beginning_offsets(#{topicPartition})" end |
- (void) close { ... }
This method returns an undefined value.
Close the consumer
373 374 375 376 377 378 379 380 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 373 def close if !block_given? return @j_del.java_method(:close, []).call() elsif block_given? return @j_del.java_method(:close, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) end raise ArgumentError, "Invalid arguments when calling close()" end |
- (void) commit { ... }
This method returns an undefined value.
Commit current offsets for all the subscribed list of topics and partition.
328 329 330 331 332 333 334 335 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 328 def commit if !block_given? return @j_del.java_method(:commit, []).call() elsif block_given? return @j_del.java_method(:commit, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) end raise ArgumentError, "Invalid arguments when calling commit()" end |
- (void) committed(topicPartition = nil) { ... }
This method returns an undefined value.
Get the last committed offset for the given partition (whether the commit happened by this process or another).
340 341 342 343 344 345 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 340 def committed(topicPartition=nil) if topicPartition.class == Hash && block_given? return @j_del.java_method(:committed, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result != nil ? JSON.parse(ar.result.toJson.encode) : nil : nil) })) end raise ArgumentError, "Invalid arguments when calling committed(#{topicPartition})" end |
- (self) end_handler { ... }
129 130 131 132 133 134 135 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 129 def end_handler if block_given? @j_del.java_method(:endHandler, [Java::IoVertxCore::Handler.java_class]).call(Proc.new { yield }) return self end raise ArgumentError, "Invalid arguments when calling end_handler()" end |
- (void) end_offsets(topicPartition = nil) { ... }
This method returns an undefined value.
Get the last offset for the given partition. The last offset of a partition is the offset of the upcoming message, i.e. the offset of the last available message + 1.
418 419 420 421 422 423 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 418 def end_offsets(topicPartition=nil) if topicPartition.class == Hash && block_given? return @j_del.java_method(:endOffsets, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result : nil) })) end raise ArgumentError, "Invalid arguments when calling end_offsets(#{topicPartition})" end |
- (self) exception_handler { ... }
47 48 49 50 51 52 53 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 47 def exception_handler if block_given? @j_del.java_method(:exceptionHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.from_throwable(event)) })) return self end raise ArgumentError, "Invalid arguments when calling exception_handler()" end |
- (self) handler { ... }
56 57 58 59 60 61 62 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 56 def handler if block_given? @j_del.java_method(:handler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.safe_create(event,::VertxKafkaClient::KafkaConsumerRecord, nil, nil)) })) return self end raise ArgumentError, "Invalid arguments when calling handler()" end |
- (void) offsets_for_times(topicPartition = nil, timestamp = nil) { ... }
This method returns an undefined value.
Look up the offset for the given partition by timestamp. Note: the result might be null in case for the given timestamp no offset can be found -- e.g., when the timestamp refers to the future
397 398 399 400 401 402 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 397 def offsets_for_times(topicPartition=nil,=nil) if topicPartition.class == Hash && .class == Fixnum && block_given? return @j_del.java_method(:offsetsForTimes, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::JavaLang::Long.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result != nil ? JSON.parse(ar.result.toJson.encode) : nil : nil) })) end raise ArgumentError, "Invalid arguments when calling offsets_for_times(#{topicPartition},#{})" end |
- (self) partitions_assigned_handler { ... }
Set the handler called when topic partitions are assigned to the consumer
247 248 249 250 251 252 253 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 247 def partitions_assigned_handler if block_given? @j_del.java_method(:partitionsAssignedHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.to_set(event).map! { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil }) })) return self end raise ArgumentError, "Invalid arguments when calling partitions_assigned_handler()" end |
- (self) partitions_for(topic = nil) { ... }
Get metadata about the partitions for a given topic.
350 351 352 353 354 355 356 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 350 def partitions_for(topic=nil) if topic.class == String && block_given? @j_del.java_method(:partitionsFor, [Java::java.lang.String.java_class,Java::IoVertxCore::Handler.java_class]).call(topic,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result.to_a.map { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil } : nil) })) return self end raise ArgumentError, "Invalid arguments when calling partitions_for(#{topic})" end |
- (self) partitions_revoked_handler { ... }
Set the handler called when topic partitions are revoked to the consumer
237 238 239 240 241 242 243 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 237 def partitions_revoked_handler if block_given? @j_del.java_method(:partitionsRevokedHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.to_set(event).map! { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil }) })) return self end raise ArgumentError, "Invalid arguments when calling partitions_revoked_handler()" end |
- (self) pause - (self) pause(topicPartition) - (self) pause(topicPartitions) - (self) pause(topicPartition, completionHandler) { ... } - (self) pause(topicPartitions, completionHandler) { ... }
Suspend fetching from the requested partitions.
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 76 def pause(param_1=nil) if !block_given? && param_1 == nil @j_del.java_method(:pause, []).call() return self elsif param_1.class == Hash && !block_given? @j_del.java_method(:pause, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1))) return self elsif param_1.class == Set && !block_given? @j_del.java_method(:pause, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) })) return self elsif param_1.class == Hash && block_given? @j_del.java_method(:pause, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self elsif param_1.class == Set && block_given? @j_del.java_method(:pause, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self end raise ArgumentError, "Invalid arguments when calling pause(#{param_1})" end |
- (void) paused { ... }
This method returns an undefined value.
Get the set of partitions that were previously paused by a call to pause(Set).
228 229 230 231 232 233 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 228 def paused if block_given? return @j_del.java_method(:paused, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ::Vertx::Util::Utils.to_set(ar.result).map! { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil } : nil) })) end raise ArgumentError, "Invalid arguments when calling paused()" end |
- (void) position(partition = nil) { ... }
This method returns an undefined value.
Get the offset of the next record that will be fetched (if a record with that offset exists).
385 386 387 388 389 390 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 385 def position(partition=nil) if partition.class == Hash && block_given? return @j_del.java_method(:position, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(partition)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result : nil) })) end raise ArgumentError, "Invalid arguments when calling position(#{partition})" end |
- (self) resume - (self) resume(topicPartition) - (self) resume(topicPartitions) - (self) resume(topicPartition, completionHandler) { ... } - (self) resume(topicPartitions, completionHandler) { ... }
Resume specified partitions which have been paused with pause.
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 108 def resume(param_1=nil) if !block_given? && param_1 == nil @j_del.java_method(:resume, []).call() return self elsif param_1.class == Hash && !block_given? @j_del.java_method(:resume, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1))) return self elsif param_1.class == Set && !block_given? @j_del.java_method(:resume, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) })) return self elsif param_1.class == Hash && block_given? @j_del.java_method(:resume, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self elsif param_1.class == Set && block_given? @j_del.java_method(:resume, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self end raise ArgumentError, "Invalid arguments when calling resume(#{param_1})" end |
- (self) seek(topicPartition = nil, offset = nil) { ... }
Overrides the fetch offsets that the consumer will use on the next poll.
259 260 261 262 263 264 265 266 267 268 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 259 def seek(topicPartition=nil,offset=nil) if topicPartition.class == Hash && offset.class == Fixnum && !block_given? @j_del.java_method(:seek, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::long.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),offset) return self elsif topicPartition.class == Hash && offset.class == Fixnum && block_given? @j_del.java_method(:seek, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::long.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),offset,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self end raise ArgumentError, "Invalid arguments when calling seek(#{topicPartition},#{offset})" end |
- (self) seekToBeginning(topicPartition) - (self) seekToBeginning(topicPartitions) - (self) seekToBeginning(topicPartition, completionHandler) { ... } - (self) seekToBeginning(topicPartitions, completionHandler) { ... }
Seek to the first offset for each of the given partitions.
281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 281 def seek_to_beginning(param_1=nil) if param_1.class == Hash && !block_given? @j_del.java_method(:seekToBeginning, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1))) return self elsif param_1.class == Set && !block_given? @j_del.java_method(:seekToBeginning, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) })) return self elsif param_1.class == Hash && block_given? @j_del.java_method(:seekToBeginning, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self elsif param_1.class == Set && block_given? @j_del.java_method(:seekToBeginning, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self end raise ArgumentError, "Invalid arguments when calling seek_to_beginning(#{param_1})" end |
- (self) seekToEnd(topicPartition) - (self) seekToEnd(topicPartitions) - (self) seekToEnd(topicPartition, completionHandler) { ... } - (self) seekToEnd(topicPartitions, completionHandler) { ... }
Seek to the last offset for each of the given partitions.
309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 309 def seek_to_end(param_1=nil) if param_1.class == Hash && !block_given? @j_del.java_method(:seekToEnd, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1))) return self elsif param_1.class == Set && !block_given? @j_del.java_method(:seekToEnd, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) })) return self elsif param_1.class == Hash && block_given? @j_del.java_method(:seekToEnd, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self elsif param_1.class == Set && block_given? @j_del.java_method(:seekToEnd, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self end raise ArgumentError, "Invalid arguments when calling seek_to_end(#{param_1})" end |
- (self) subscribe(topic) - (self) subscribe(topics) - (self) subscribe(topic, completionHandler) { ... } - (self) subscribe(topics, completionHandler) { ... }
Subscribe to the given list of topics to get dynamically assigned partitions.
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 148 def subscribe(param_1=nil) if param_1.class == String && !block_given? @j_del.java_method(:subscribe, [Java::java.lang.String.java_class]).call(param_1) return self elsif param_1.class == Set && !block_given? @j_del.java_method(:subscribe, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| element })) return self elsif param_1.class == String && block_given? @j_del.java_method(:subscribe, [Java::java.lang.String.java_class,Java::IoVertxCore::Handler.java_class]).call(param_1,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self elsif param_1.class == Set && block_given? @j_del.java_method(:subscribe, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| element }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self end raise ArgumentError, "Invalid arguments when calling subscribe(#{param_1})" end |
- (self) subscription { ... }
Get the current subscription.
218 219 220 221 222 223 224 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 218 def subscription if block_given? @j_del.java_method(:subscription, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ::Vertx::Util::Utils.to_set(ar.result).map! { |elt| elt } : nil) })) return self end raise ArgumentError, "Invalid arguments when calling subscription()" end |
- (self) unsubscribe { ... }
Unsubscribe from topics currently subscribed with subscribe.
205 206 207 208 209 210 211 212 213 214 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 205 def unsubscribe if !block_given? @j_del.java_method(:unsubscribe, []).call() return self elsif block_given? @j_del.java_method(:unsubscribe, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self end raise ArgumentError, "Invalid arguments when calling unsubscribe()" end |