Class: VertxRabbitmq::RabbitMQConsumer
- Inherits:
-
Object
- Object
- VertxRabbitmq::RabbitMQConsumer
- Includes:
- Vertx::ReadStream
- Defined in:
- /Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-rabbitmq/rabbit_mq_consumer.rb
Overview
A stream of messages from a rabbitmq queue.
Constant Summary
- @@j_api_type =
Object.new
Class Method Summary (collapse)
- + (Boolean) accept?(obj)
- + (Object) j_api_type
- + (Object) j_class
- + (Object) unwrap(obj)
- + (Object) wrap(obj)
Instance Method Summary (collapse)
-
- (void) cancel
Stop message consumption from a queue.
-
- (String) consumerTag
A consumer tag.
-
- (self) endHandler(endHandler) { ... }
Set an end handler.
-
- (self) exceptionHandler(exceptionHandler) { ... }
Set an exception handler on the read stream.
-
- (self) fetch(amount)
Fetch the specified amount of elements.
-
- (self) handler(messageArrived) { ... }
Set a message handler.
-
- (self) pause
Pause the stream of incoming messages from queue.
-
- (true, false) isPaused
Is the stream paused?.
-
- (::Vertx::Pipe) pipe
Pause this stream and return a to transfer the elements of this stream to a destination .
-
- (void) pipeTo(dst, handler) { ... }
Pipe this ReadStream to the WriteStream.
-
- (self) resume
Resume reading from a queue.
Class Method Details
+ (Boolean) accept?(obj)
24 25 26 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-rabbitmq/rabbit_mq_consumer.rb', line 24 def @@j_api_type.accept?(obj) obj.class == RabbitMQConsumer end |
+ (Object) j_api_type
33 34 35 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-rabbitmq/rabbit_mq_consumer.rb', line 33 def self.j_api_type @@j_api_type end |
+ (Object) j_class
36 37 38 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-rabbitmq/rabbit_mq_consumer.rb', line 36 def self.j_class Java::IoVertxRabbitmq::RabbitMQConsumer.java_class end |
+ (Object) unwrap(obj)
30 31 32 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-rabbitmq/rabbit_mq_consumer.rb', line 30 def @@j_api_type.unwrap(obj) obj.j_del end |
+ (Object) wrap(obj)
27 28 29 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-rabbitmq/rabbit_mq_consumer.rb', line 27 def @@j_api_type.wrap(obj) RabbitMQConsumer.new(obj) end |
Instance Method Details
- (void) cancel - (void) cancel(cancelResult) { ... }
This method returns an undefined value.
Stop message consumption from a queue.The operation is asynchronous. When consumption will be stopped, you can by notified via #end_handler
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-rabbitmq/rabbit_mq_consumer.rb', line 46 def cancel if !block_given? return @j_del.java_method(:cancel, []).call() elsif true if (block_given?) return @j_del.java_method(:cancel, [Java::IoVertxCore::Handler.java_class]).call(block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise) else promise = ::Vertx::Util::Utils.promise @j_del.java_method(:cancel, [Java::IoVertxCore::Handler.java_class]).call(block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise) return ::Vertx::Util::Utils.safe_create(promise.future(),::Vertx::Future, nil) end end if defined?(super) super else raise ArgumentError, "Invalid arguments when calling cancel()" end end |
- (String) consumerTag
Returns a consumer tag
92 93 94 95 96 97 98 99 100 101 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-rabbitmq/rabbit_mq_consumer.rb', line 92 def consumer_tag if !block_given? return @j_del.java_method(:consumerTag, []).call() end if defined?(super) super else raise ArgumentError, "Invalid arguments when calling consumer_tag()" end end |
- (self) endHandler(endHandler) { ... }
Set an end handler. Once the stream has canceled successfully, the handler will be called.
147 148 149 150 151 152 153 154 155 156 157 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-rabbitmq/rabbit_mq_consumer.rb', line 147 def end_handler if true @j_del.java_method(:endHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(nil) unless !block_given? })) return self end if defined?(super) super else raise ArgumentError, "Invalid arguments when calling end_handler()" end end |
- (self) exceptionHandler(exceptionHandler) { ... }
Set an exception handler on the read stream.
194 195 196 197 198 199 200 201 202 203 204 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-rabbitmq/rabbit_mq_consumer.rb', line 194 def exception_handler if true @j_del.java_method(:exceptionHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.from_throwable(event)) unless !block_given? })) return self end if defined?(super) super else raise ArgumentError, "Invalid arguments when calling exception_handler()" end end |
- (self) fetch(amount)
Fetch the specified
amount
of elements. If the ReadStream
has been paused, reading will
recommence with the specified amount
of items, otherwise the specified amount
will
be added to the current stream demand.
164 165 166 167 168 169 170 171 172 173 174 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-rabbitmq/rabbit_mq_consumer.rb', line 164 def fetch(*args) if args[0].class == Fixnum && !block_given? @j_del.java_method(:fetch, [Java::long.java_class]).call(args[0]) return self end if defined?(super) super else raise ArgumentError, "Invalid arguments when calling fetch(#{args[0]})" end end |
- (self) handler(messageArrived) { ... }
Set a message handler. As message appear in a queue, the handler will be called with the message.
106 107 108 109 110 111 112 113 114 115 116 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-rabbitmq/rabbit_mq_consumer.rb', line 106 def handler if true @j_del.java_method(:handler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.safe_create(event,::VertxRabbitmq::RabbitMQMessage)) unless !block_given? })) return self end if defined?(super) super else raise ArgumentError, "Invalid arguments when calling handler()" end end |
- (self) pause
Pause the stream of incoming messages from queue.
The messages will continue to arrive, but they will be stored in a internal queue. If the queue size would exceed the limit provided by , then incoming messages will be discarded.
211 212 213 214 215 216 217 218 219 220 221 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-rabbitmq/rabbit_mq_consumer.rb', line 211 def pause if !block_given? @j_del.java_method(:pause, []).call() return self end if defined?(super) super else raise ArgumentError, "Invalid arguments when calling pause()" end end |
- (true, false) isPaused
Returns is the stream paused?
80 81 82 83 84 85 86 87 88 89 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-rabbitmq/rabbit_mq_consumer.rb', line 80 def paused? if !block_given? return @j_del.java_method(:isPaused, []).call() end if defined?(super) super else raise ArgumentError, "Invalid arguments when calling paused?()" end end |
- (::Vertx::Pipe) pipe
Pause this stream and return a to transfer the elements of this stream to a destination .
The stream will be resumed when the pipe will be wired to a
WriteStream
.
180 181 182 183 184 185 186 187 188 189 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-rabbitmq/rabbit_mq_consumer.rb', line 180 def pipe if !block_given? return ::Vertx::Util::Utils.safe_create(@j_del.java_method(:pipe, []).call(),::Vertx::Pipe,::VertxRabbitmq::RabbitMQMessage.j_api_type) end if defined?(super) super else raise ArgumentError, "Invalid arguments when calling pipe()" end end |
- (void) pipeTo(dst, handler) { ... }
This method returns an undefined value.
Pipe thisReadStream
to the WriteStream
.
Elements emitted by this stream will be written to the write stream until this stream ends or fails.
Once this stream has ended or failed, the write stream will be ended and the handler
will be
called with the result.
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-rabbitmq/rabbit_mq_consumer.rb', line 127 def pipe_to(*args) if args[0].class.method_defined?(:j_del) && true if (block_given?) return @j_del.java_method(:pipeTo, [Java::IoVertxCoreStreams::WriteStream.java_class,Java::IoVertxCore::Handler.java_class]).call(args[0].j_del,block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise) else promise = ::Vertx::Util::Utils.promise @j_del.java_method(:pipeTo, [Java::IoVertxCoreStreams::WriteStream.java_class,Java::IoVertxCore::Handler.java_class]).call(args[0].j_del,block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise) return ::Vertx::Util::Utils.safe_create(promise.future(),::Vertx::Future, nil) end end if defined?(super) super else raise ArgumentError, "Invalid arguments when calling pipe_to(#{args[0]})" end end |
- (self) resume
Resume reading from a queue. Flushes internal queue.
67 68 69 70 71 72 73 74 75 76 77 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-rabbitmq/rabbit_mq_consumer.rb', line 67 def resume if !block_given? @j_del.java_method(:resume, []).call() return self end if defined?(super) super else raise ArgumentError, "Invalid arguments when calling resume()" end end |