Class: VertxKafkaClient::KafkaProducer

Inherits:
Object
  • Object
show all
Includes:
Vertx::WriteStream
Defined in:
/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb

Overview

Vert.x Kafka producer.

The Vertx::WriteStream#write provides global control over writing a record.

Class Method Summary (collapse)

Instance Method Summary (collapse)

Class Method Details

+ (::VertxKafkaClient::KafkaProducer) create(vertx, config) + (::VertxKafkaClient::KafkaProducer) create(vertx, config, keyType, valueType)

Create a new KafkaProducer instance

Overloads:

  • + (::VertxKafkaClient::KafkaProducer) create(vertx, config)

    Parameters:

    • vertx (::Vertx::Vertx)
      Vert.x instance to use
    • config (Hash{String => String})
      Kafka producer configuration
  • + (::VertxKafkaClient::KafkaProducer) create(vertx, config, keyType, valueType)

    Parameters:

    • vertx (::Vertx::Vertx)
      Vert.x instance to use
    • config (Hash{String => String})
      Kafka producer configuration
    • keyType (Nil)
      class type for the key serialization
    • valueType (Nil)
      class type for the value serialization

Returns:



130
131
132
133
134
135
136
137
138
139
140
141
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb', line 130

def self.create(*args)
  if args[0].class.method_defined?(:j_del) && args[1].class == Hash && !block_given? && args[2] == nil && args[3] == nil
    return ::Vertx::Util::Utils.safe_create(Java::IoVertxKafkaClientProducer::KafkaProducer.java_method(:create, [Java::IoVertxCore::Vertx.java_class,Java::JavaUtil::Map.java_class]).call(args[0].j_del,Hash[args[1].map { |k,v| [k,v] }]),::VertxKafkaClient::KafkaProducer, nil, nil)
  elsif args[0].class.method_defined?(:j_del) && args[1].class == Hash && args[2].class == Class && args[3].class == Class && !block_given?
    return ::Vertx::Util::Utils.safe_create(Java::IoVertxKafkaClientProducer::KafkaProducer.java_method(:create, [Java::IoVertxCore::Vertx.java_class,Java::JavaUtil::Map.java_class,Java::JavaLang::Class.java_class,Java::JavaLang::Class.java_class]).call(args[0].j_del,Hash[args[1].map { |k,v| [k,v] }],::Vertx::Util::Utils.j_class_of(args[2]),::Vertx::Util::Utils.j_class_of(args[3])),::VertxKafkaClient::KafkaProducer, ::Vertx::Util::Utils.v_type_of(args[2]), ::Vertx::Util::Utils.v_type_of(args[3]))
  end
  if defined?(super)
    super
  else
    raise ArgumentError, "Invalid arguments when calling create(#{args[0]},#{args[1]},#{args[2]},#{args[3]})"
  end
end

+ (::VertxKafkaClient::KafkaProducer) createShared(vertx, name, config) + (::VertxKafkaClient::KafkaProducer) createShared(vertx, name, config, keyType, valueType)

Get or create a KafkaProducer instance which shares its stream with any other KafkaProducer created with the same name

Overloads:

  • + (::VertxKafkaClient::KafkaProducer) createShared(vertx, name, config)

    Parameters:

    • vertx (::Vertx::Vertx)
      Vert.x instance to use
    • name (String)
      the producer name to identify it
    • config (Hash{String => String})
      Kafka producer configuration
  • + (::VertxKafkaClient::KafkaProducer) createShared(vertx, name, config, keyType, valueType)

    Parameters:

    • vertx (::Vertx::Vertx)
      Vert.x instance to use
    • name (String)
      the producer name to identify it
    • config (Hash{String => String})
      Kafka producer configuration
    • keyType (Nil)
      class type for the key serialization
    • valueType (Nil)
      class type for the value serialization

Returns:



108
109
110
111
112
113
114
115
116
117
118
119
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb', line 108

def self.create_shared(*args)
  if args[0].class.method_defined?(:j_del) && args[1].class == String && args[2].class == Hash && !block_given? && args[3] == nil && args[4] == nil
    return ::Vertx::Util::Utils.safe_create(Java::IoVertxKafkaClientProducer::KafkaProducer.java_method(:createShared, [Java::IoVertxCore::Vertx.java_class,Java::java.lang.String.java_class,Java::JavaUtil::Map.java_class]).call(args[0].j_del,args[1],Hash[args[2].map { |k,v| [k,v] }]),::VertxKafkaClient::KafkaProducer, nil, nil)
  elsif args[0].class.method_defined?(:j_del) && args[1].class == String && args[2].class == Hash && args[3].class == Class && args[4].class == Class && !block_given?
    return ::Vertx::Util::Utils.safe_create(Java::IoVertxKafkaClientProducer::KafkaProducer.java_method(:createShared, [Java::IoVertxCore::Vertx.java_class,Java::java.lang.String.java_class,Java::JavaUtil::Map.java_class,Java::JavaLang::Class.java_class,Java::JavaLang::Class.java_class]).call(args[0].j_del,args[1],Hash[args[2].map { |k,v| [k,v] }],::Vertx::Util::Utils.j_class_of(args[3]),::Vertx::Util::Utils.j_class_of(args[4])),::VertxKafkaClient::KafkaProducer, ::Vertx::Util::Utils.v_type_of(args[3]), ::Vertx::Util::Utils.v_type_of(args[4]))
  end
  if defined?(super)
    super
  else
    raise ArgumentError, "Invalid arguments when calling create_shared(#{args[0]},#{args[1]},#{args[2]},#{args[3]},#{args[4]})"
  end
end

Instance Method Details

- (void) close(completionHandler) { ... } - (void) close(timeout, completionHandler) { ... }

This method returns an undefined value.

Close the producer

Overloads:

  • - (void) close(completionHandler) { ... }

    Yields:

    • handler called on operation completed
  • - (void) close(timeout, completionHandler) { ... }

    Parameters:

    • timeout (Fixnum)
      timeout to wait for closing

    Yields:

    • handler called on operation completed


180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb', line 180

def close(*args)
  if true && args[0] == nil
    if (block_given?)
      return @j_del.java_method(:close, [Java::IoVertxCore::Handler.java_class]).call(block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise)
    else
      promise = ::Vertx::Util::Utils.promise
      @j_del.java_method(:close, [Java::IoVertxCore::Handler.java_class]).call(block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise)
      return ::Vertx::Util::Utils.safe_create(promise.future(),::Vertx::Future, nil)
    end
  elsif args[0].class == Fixnum && true
    if (block_given?)
      return @j_del.java_method(:close, [Java::long.java_class,Java::IoVertxCore::Handler.java_class]).call(args[0],block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise)
    else
      promise = ::Vertx::Util::Utils.promise
      @j_del.java_method(:close, [Java::long.java_class,Java::IoVertxCore::Handler.java_class]).call(args[0],block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise)
      return ::Vertx::Util::Utils.safe_create(promise.future(),::Vertx::Future, nil)
    end
  end
  if defined?(super)
    super
  else
    raise ArgumentError, "Invalid arguments when calling close(#{args[0]})"
  end
end

- (self) drainHandler(handler) { ... }

Yields:

Returns:

  • (self)


50
51
52
53
54
55
56
57
58
59
60
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb', line 50

def drain_handler
  if true
    @j_del.java_method(:drainHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(nil) unless !block_given? }))
    return self
  end
  if defined?(super)
    super
  else
    raise ArgumentError, "Invalid arguments when calling drain_handler()"
  end
end

- (void) end(handler) { ... } - (void) end(data, handler) { ... }

This method returns an undefined value.

Same as but with an handler called when the operation completes

Overloads:



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb', line 149

def end(*args)
  if true && args[0] == nil
    if (block_given?)
      return @j_del.java_method(:end, [Java::IoVertxCore::Handler.java_class]).call(block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise)
    else
      promise = ::Vertx::Util::Utils.promise
      @j_del.java_method(:end, [Java::IoVertxCore::Handler.java_class]).call(block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise)
      return ::Vertx::Util::Utils.safe_create(promise.future(),::Vertx::Future, nil)
    end
  elsif args[0].class.method_defined?(:j_del) && true
    if (block_given?)
      return @j_del.java_method(:end, [Java::IoVertxKafkaClientProducer::KafkaProducerRecord.java_class,Java::IoVertxCore::Handler.java_class]).call(args[0].j_del,block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise)
    else
      promise = ::Vertx::Util::Utils.promise
      @j_del.java_method(:end, [Java::IoVertxKafkaClientProducer::KafkaProducerRecord.java_class,Java::IoVertxCore::Handler.java_class]).call(args[0].j_del,block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise)
      return ::Vertx::Util::Utils.safe_create(promise.future(),::Vertx::Future, nil)
    end
  end
  if defined?(super)
    super
  else
    raise ArgumentError, "Invalid arguments when calling end(#{args[0]})"
  end
end

- (self) exceptionHandler(handler) { ... }

Yields:

Returns:

  • (self)


229
230
231
232
233
234
235
236
237
238
239
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb', line 229

def exception_handler
  if true
    @j_del.java_method(:exceptionHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.from_throwable(event)) unless !block_given? }))
    return self
  end
  if defined?(super)
    super
  else
    raise ArgumentError, "Invalid arguments when calling exception_handler()"
  end
end

- (self) flush(completionHandler) { ... }

Invoking this method makes all buffered records immediately available to write

Yields:

  • handler called on operation completed

Returns:

  • (self)


30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb', line 30

def flush
  if true
    if (block_given?)
      @j_del.java_method(:flush, [Java::IoVertxCore::Handler.java_class]).call(block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise)
      return self
    else
      promise = ::Vertx::Util::Utils.promise
      @j_del.java_method(:flush, [Java::IoVertxCore::Handler.java_class]).call(block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise)
      return ::Vertx::Util::Utils.safe_create(promise.future(),::Vertx::Future, nil)
    end
  end
  if defined?(super)
    super
  else
    raise ArgumentError, "Invalid arguments when calling flush()"
  end
end

- (self) partitionsFor(topic, handler) { ... }

Get the partition metadata for the give topic.

Parameters:

  • topic (String)
    topic partition for which getting partitions info

Yields:

  • handler called on operation completed

Returns:

  • (self)


66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb', line 66

def partitions_for(*args)
  if args[0].class == String && true
    if (block_given?)
      @j_del.java_method(:partitionsFor, [Java::java.lang.String.java_class,Java::IoVertxCore::Handler.java_class]).call(args[0],block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result.to_a.map { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil } : nil) } : promise)
      return self
    else
      promise = ::Vertx::Util::Utils.promise
      @j_del.java_method(:partitionsFor, [Java::java.lang.String.java_class,Java::IoVertxCore::Handler.java_class]).call(args[0],block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result.to_a.map { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil } : nil) } : promise)
      return ::Vertx::Util::Utils.safe_create(promise.future(),::Vertx::Future, nil)
    end
  end
  if defined?(super)
    super
  else
    raise ArgumentError, "Invalid arguments when calling partitions_for(#{args[0]})"
  end
end

- (self) send(record, handler) { ... }

Asynchronously write a record to a topic

Parameters:

Yields:

  • handler called on operation completed

Returns:

  • (self)


209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb', line 209

def send(*args)
  if args[0].class.method_defined?(:j_del) && true
    if (block_given?)
      @j_del.java_method(:send, [Java::IoVertxKafkaClientProducer::KafkaProducerRecord.java_class,Java::IoVertxCore::Handler.java_class]).call(args[0].j_del,block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result != nil ? JSON.parse(ar.result.toJson.encode) : nil : nil) } : promise)
      return self
    else
      promise = ::Vertx::Util::Utils.promise
      @j_del.java_method(:send, [Java::IoVertxKafkaClientProducer::KafkaProducerRecord.java_class,Java::IoVertxCore::Handler.java_class]).call(args[0].j_del,block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result != nil ? JSON.parse(ar.result.toJson.encode) : nil : nil) } : promise)
      return ::Vertx::Util::Utils.safe_create(promise.future(),::Vertx::Future,::Vertx::Util::data_object_type(Java::IoVertxKafkaClientProducer::RecordMetadata))
    end
  end
  if defined?(super)
    super
  else
    raise ArgumentError, "Invalid arguments when calling send(#{args[0]})"
  end
end

- (self) setWriteQueueMaxSize(i)

Parameters:

  • i (Fixnum)

Returns:

  • (self)


264
265
266
267
268
269
270
271
272
273
274
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb', line 264

def set_write_queue_max_size(*args)
  if args[0].class == Fixnum && !block_given?
    @j_del.java_method(:setWriteQueueMaxSize, [Java::int.java_class]).call(args[0])
    return self
  end
  if defined?(super)
    super
  else
    raise ArgumentError, "Invalid arguments when calling set_write_queue_max_size(#{args[0]})"
  end
end

- (void) write(data, handler) { ... }

This method returns an undefined value.

Same as but with an handler called when the operation completes

Parameters:

Yields:



245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb', line 245

def write(*args)
  if args[0].class.method_defined?(:j_del) && true
    if (block_given?)
      return @j_del.java_method(:write, [Java::IoVertxKafkaClientProducer::KafkaProducerRecord.java_class,Java::IoVertxCore::Handler.java_class]).call(args[0].j_del,block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise)
    else
      promise = ::Vertx::Util::Utils.promise
      @j_del.java_method(:write, [Java::IoVertxKafkaClientProducer::KafkaProducerRecord.java_class,Java::IoVertxCore::Handler.java_class]).call(args[0].j_del,block_given? ? Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? nil : nil) } : promise)
      return ::Vertx::Util::Utils.safe_create(promise.future(),::Vertx::Future, nil)
    end
  end
  if defined?(super)
    super
  else
    raise ArgumentError, "Invalid arguments when calling write(#{args[0]})"
  end
end

- (true, false) writeQueueFull

This will return true if there are more bytes in the write queue than the value set using #set_write_queue_max_size

Returns:

  • (true, false)
    true if write queue is full


86
87
88
89
90
91
92
93
94
95
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb', line 86

def write_queue_full?
  if !block_given?
    return @j_del.java_method(:writeQueueFull, []).call()
  end
  if defined?(super)
    super
  else
    raise ArgumentError, "Invalid arguments when calling write_queue_full?()"
  end
end