Class: Concurrent::Promises::Channel
- Inherits:
-
Synchronization::Object
- Object
- Synchronization::Object
- Concurrent::Promises::Channel
- Defined in:
- lib-edge/concurrent/edge/channel.rb
Overview
Edge Features are under active development and may change frequently.
- Deprecations are not added before incompatible changes.
- Edge version: major is always 0, minor bump means incompatible change, patch bump means compatible change.
- Edge features may also lack tests and documentation.
- Features developed in
concurrent-ruby-edge
are expected to move toconcurrent-ruby
when finalised.
A first in first out channel that accepts messages with push family of methods and returns messages with pop family of methods. Pop and push operations can be represented as futures, see #pop_op and #push_op. The capacity of the channel can be limited to support back pressure, use capacity option in #initialize. #pop method blocks ans #pop_op returns pending future if there is no message in the channel. If the capacity is limited the #push method blocks and #push_op returns pending future.
Examples
Let's start by creating a channel with a capacity of 2 messages.
ch = Concurrent::Promises::Channel.new 2
# => #<Concurrent::Promises::Channel:0x000002 capacity taken 0 of 2>
We push 3 messages, then it can be observed that the last thread pushing is sleeping since the channel is full.
threads = Array.new(3) { |i| Thread.new { ch.push message: i } }
sleep 0.01 # let the threads run
threads
# => [#<Thread:0x000003@channel.in.md:14 dead>,
# #<Thread:0x000004@channel.in.md:14 dead>,
# #<Thread:0x000005@channel.in.md:14 sleep_forever>]
When message is popped the last thread continues and finishes as well.
ch.pop # => {:message=>0}
threads.map(&:join)
# => [#<Thread:0x000003@channel.in.md:14 dead>,
# #<Thread:0x000004@channel.in.md:14 dead>,
# #<Thread:0x000005@channel.in.md:14 dead>]
Same principle applies to popping as well. There are now 2 messages int he channel. Lets create 3 threads trying to pop a message, one will be blocked until new messages is pushed.
threads = Array.new(3) { |i| Thread.new { ch.pop } }
sleep 0.01 # let the threads run
threads
# => [#<Thread:0x000006@channel.in.md:32 dead>,
# #<Thread:0x000007@channel.in.md:32 dead>,
# #<Thread:0x000008@channel.in.md:32 sleep_forever>]
ch.push message: 3
# => #<Concurrent::Promises::Channel:0x000002 capacity taken 0 of 2>
threads.map(&:value)
# => [{:message=>1}, {:message=>2}, {:message=>3}]
Promises integration
However this channel is implemented to integrate with promises therefore all operations can be represented as futures.
ch = Concurrent::Promises::Channel.new 2
# => #<Concurrent::Promises::Channel:0x000009 capacity taken 0 of 2>
push_operations = Array.new(3) { |i| ch.push_op message: i }
# => [#<Concurrent::Promises::Future:0x00000a fulfilled with #<Concurrent::Promises::Channel:0x000009 capacity taken 2 of 2>>,
# #<Concurrent::Promises::Future:0x00000b fulfilled with #<Concurrent::Promises::Channel:0x000009 capacity taken 2 of 2>>,
# #<Concurrent::Promises::ResolvableFuture:0x00000c pending>]
We do not have to sleep here letting the futures execute as Threads. Since there is capacity for 2 messages the Promises are immediately resolved without ever allocating a Thread to execute. Push and pop operations are often more efficient. The remaining pending push operation will also never require another thread, instead it will resolve when a message is popped from the channel making a space for a new message.
ch.pop_op.value! # => {:message=>0}
push_operations.map(&:value!)
# => [#<Concurrent::Promises::Channel:0x000009 capacity taken 2 of 2>,
# #<Concurrent::Promises::Channel:0x000009 capacity taken 2 of 2>,
# #<Concurrent::Promises::Channel:0x000009 capacity taken 2 of 2>]
pop_operations = Array.new(3) { |i| ch.pop_op }
# => [#<Concurrent::Promises::ResolvableFuture:0x00000d fulfilled with {:message=>1}>,
# #<Concurrent::Promises::ResolvableFuture:0x00000e fulfilled with {:message=>2}>,
# #<Concurrent::Promises::ResolvableFuture:0x00000f pending>]
ch.push message: 3 # (push|pop) can be freely mixed with (push_o|pop_op)
pop_operations.map(&:value)
# => [{:message=>1}, {:message=>2}, {:message=>3}]
Selecting over channels
A selection over channels can be created with the .select_channel
factory method. It
will be fulfilled with a first message available in any of the channels. It
returns a pair to be able to find out which channel had the message available.
ch1 = Concurrent::Promises::Channel.new 2
# => #<Concurrent::Promises::Channel:0x000010 capacity taken 0 of 2>
ch2 = Concurrent::Promises::Channel.new 2
# => #<Concurrent::Promises::Channel:0x000011 capacity taken 0 of 2>
ch1.push 1
# => #<Concurrent::Promises::Channel:0x000010 capacity taken 1 of 2>
ch2.push 2
# => #<Concurrent::Promises::Channel:0x000011 capacity taken 1 of 2>
Concurrent::Promises::Channel.select([ch1, ch2])
# => [#<Concurrent::Promises::Channel:0x000010 capacity taken 0 of 2>, 1]
ch1.select(ch2)
# => [#<Concurrent::Promises::Channel:0x000011 capacity taken 0 of 2>, 2]
Concurrent::Promises.future { 3 + 4 }.then_channel_push(ch1)
# => #<Concurrent::Promises::Future:0x000012 pending>
Concurrent::Promises::Channel.
# or `ch1.select_op(ch2)` would be equivalent
select_op([ch1, ch2]).
then('got number %03d from ch%d') { |(channel, value), format|
format format, value, [ch1, ch2].index(channel).succ
}.value! # => "got number 007 from ch1"
try_
variants
All blocking operations (#pop, #push, #select) have non-blocking variant
with try_
prefix.
They always return immediately and indicate either success or failure.
ch
# => #<Concurrent::Promises::Channel:0x000009 capacity taken 0 of 2>
ch.try_push 1 # => true
ch.try_push 2 # => true
ch.try_push 3 # => false
ch.try_pop # => 1
ch.try_pop # => 2
ch.try_pop # => nil
Timeouts
All blocking operations (#pop, #push, #select) have a timeout option.
Similar to try_
variants it will indicate success or timing out,
when the timeout option is used.
ch
# => #<Concurrent::Promises::Channel:0x000009 capacity taken 0 of 2>
ch.push 1, 0.01 # => true
ch.push 2, 0.01 # => true
ch.push 3, 0.01 # => false
ch.pop 0.01 # => 1
ch.pop 0.01 # => 2
ch.pop 0.01 # => nil
Backpressure
Most importantly the channel can be used to create systems with backpressure. A self adjusting system where the producers will slow down if the consumers are not keeping up.
channel = Concurrent::Promises::Channel.new 2
# => #<Concurrent::Promises::Channel:0x000013 capacity taken 0 of 2>
log = Concurrent::Array.new # => []
producers = Array.new 2 do |i|
Thread.new(i) do |i|
4.times do |j|
log.push format "producer %d pushing %d", i, j
channel.push [i, j]
end
end
end
# => [#<Thread:0x000014@channel.in.md:133 run>,
# #<Thread:0x000015@channel.in.md:133 run>]
consumers = Array.new 4 do |i|
Thread.new(i) do |consumer|
2.times do |j|
from, = channel.pop
log.push format "consumer %d got %d. payload %d from producer %d",
consumer, j, , from
do_stuff
end
end
end
# => [#<Thread:0x000016@channel.in.md:142 run>,
# #<Thread:0x000017@channel.in.md:142 run>,
# #<Thread:0x000018@channel.in.md:142 run>,
# #<Thread:0x000019@channel.in.md:142 run>]
# wait for all to finish
producers.map(&:join)
# => [#<Thread:0x000014@channel.in.md:133 dead>,
# #<Thread:0x000015@channel.in.md:133 dead>]
consumers.map(&:join)
# => [#<Thread:0x000016@channel.in.md:142 dead>,
# #<Thread:0x000017@channel.in.md:142 dead>,
# #<Thread:0x000018@channel.in.md:142 dead>,
# #<Thread:0x000019@channel.in.md:142 dead>]
# investigate log
log
# => ["producer 0 pushing 0",
# "producer 0 pushing 1",
# "producer 0 pushing 2",
# "producer 1 pushing 0",
# "consumer 0 got 0. payload 0 from producer 0",
# "consumer 1 got 0. payload 1 from producer 0",
# "producer 1 pushing 1",
# "consumer 3 got 0. payload 2 from producer 0",
# "producer 1 pushing 2",
# "consumer 2 got 0. payload 0 from producer 1",
# "producer 1 pushing 3",
# "producer 0 pushing 3",
# "consumer 0 got 1. payload 1 from producer 1",
# "consumer 1 got 1. payload 2 from producer 1",
# "consumer 3 got 1. payload 3 from producer 1",
# "consumer 2 got 1. payload 3 from producer 0"]
The producers are much faster than consumers
(since they do_stuff
which takes some time)
but as it can be seen from the log they fill the channel
and then they slow down
until there is space available in the channel.
If permanent allocation of threads to the producers and consumers has to be avoided, the threads can be replaced with promises that run a thread pool.
channel = Concurrent::Promises::Channel.new 2
# => #<Concurrent::Promises::Channel:0x00001a capacity taken 0 of 2>
log = Concurrent::Array.new # => []
def produce(channel, log, producer, i)
log.push format "producer %d pushing %d", producer, i
channel.push_op([producer, i]).then do
i + 1 < 4 ? produce(channel, log, producer, i + 1) : :done
end
end # => :produce
def consume(channel, log, consumer, i)
channel.pop_op.then(consumer, i) do |(from, ), consumer, i|
log.push format "consumer %d got %d. payload %d from producer %d",
consumer, i, , from
do_stuff
i + 1 < 2 ? consume(channel, log, consumer, i + 1) : :done
end
end # => :consume
producers = Array.new 2 do |i|
Concurrent::Promises.future(channel, log, i) { |*args| produce *args, 0 }.run
end
# => [#<Concurrent::Promises::Future:0x00001b pending>,
# #<Concurrent::Promises::Future:0x00001c pending>]
consumers = Array.new 4 do |i|
Concurrent::Promises.future(channel, log, i) { |*args| consume *args, 0 }.run
end
# => [#<Concurrent::Promises::Future:0x00001d pending>,
# #<Concurrent::Promises::Future:0x00001e pending>,
# #<Concurrent::Promises::Future:0x00001f pending>,
# #<Concurrent::Promises::Future:0x000020 pending>]
# wait for all to finish
producers.map(&:value!) # => [:done, :done]
consumers.map(&:value!) # => [:done, :done, :done, :done]
# investigate log
log
# => ["producer 0 pushing 0",
# "producer 1 pushing 0",
# "producer 0 pushing 1",
# "producer 1 pushing 1",
# "consumer 0 got 0. payload 0 from producer 0",
# "consumer 1 got 0. payload 0 from producer 1",
# "consumer 2 got 0. payload 1 from producer 0",
# "producer 0 pushing 2",
# "consumer 3 got 0. payload 1 from producer 1",
# "producer 1 pushing 2",
# "producer 0 pushing 3",
# "producer 1 pushing 3",
# "consumer 0 got 1. payload 2 from producer 1",
# "consumer 2 got 1. payload 3 from producer 0",
# "consumer 1 got 1. payload 2 from producer 0",
# "consumer 3 got 1. payload 3 from producer 1"]
Synchronization of workers by passing a value
If the capacity of the channel is zero then any push operation will succeed only when there is a matching pop operation which can take the message. The operations have to be paired to succeed.
channel = Concurrent::Promises::Channel.new 0
# => #<Concurrent::Promises::Channel:0x000021 capacity taken 0 of 0>
thread = Thread.new { channel.pop }; sleep 0.01
# allow the thread to go to sleep
thread
# => #<Thread:0x000022@channel.in.md:214 sleep_forever>
# succeeds because there is matching pop operation waiting in the thread
channel.try_push(:v1) # => true
# remains pending, since there is no matching operation
push = channel.push_op(:v2)
# => #<Concurrent::Promises::ResolvableFuture:0x000023 pending>
thread.value # => :v1
# the push operation resolves as a pairing pop is called
channel.pop # => :v2
push
# => #<Concurrent::Promises::ResolvableFuture:0x000023 fulfilled with #<Concurrent::Promises::Channel:0x000021 capacity taken 0 of 0>>
Constant Summary collapse
- UNLIMITED_CAPACITY =
Default capacity of the Channel, makes it accept unlimited number of messages.
::Object.new
- ANY =
An object which matches anything (with #===)
Object.new.tap do |any| def any.===(other) true end def any.to_s 'ANY' end end
Class Method Summary collapse
- .select(channels, timeout = nil) ⇒ ::Array(Channel, Object), nil
- .select_matching(matcher, channels, timeout = nil) ⇒ ::Array(Channel, Object), nil
- .select_op(channels, probe = Promises.resolvable_future) ⇒ Future(::Array(Channel, Object))
- .select_op_matching(matcher, channels, probe = Promises.resolvable_future) ⇒ Future(::Array(Channel, Object))
- .try_select(channels) ⇒ ::Array(Channel, Object)
- .try_select_matching(matcher, channels) ⇒ ::Array(Channel, Object)
Instance Method Summary collapse
-
#capacity ⇒ Integer
Maximum capacity of the Channel.
-
#initialize(capacity = UNLIMITED_CAPACITY) ⇒ Channel
constructor
Create channel.
-
#peek(no_value = nil) ⇒ Object, no_value
Behaves as #try_pop but it does not remove the message from the channel.
-
#peek_matching(matcher, no_value = nil) ⇒ Object, no_value
Behaves as #try_pop but it does not remove the message from the channel.
-
#pop(timeout = nil, timeout_value = nil) ⇒ Object, nil
Blocks current thread until a message is available in the channel for popping.
-
#pop_matching(matcher, timeout = nil, timeout_value = nil) ⇒ Object, nil
Blocks current thread until a message is available in the channel for popping.
-
#pop_op(probe = Promises.resolvable_future) ⇒ Future(Object)
Returns a future witch will become fulfilled with a value from the channel when one is available.
-
#pop_op_matching(matcher, probe = Promises.resolvable_future) ⇒ Future(Object)
Returns a future witch will become fulfilled with a value from the channel when one is available.
-
#push(message, timeout = nil) ⇒ self, true, false
Blocks current thread until the message is pushed into the channel.
-
#push_op(message) ⇒ ResolvableFuture(self)
Returns future which will fulfill when the message is pushed to the channel.
-
#select(channels, timeout = nil) ⇒ ::Array(Channel, Object), nil
As #select_op but does not return future, it block current thread instead until there is a message available in the receiver or in any of the channels.
-
#select_matching(matcher, channels, timeout = nil) ⇒ ::Array(Channel, Object), nil
As #select_op but does not return future, it block current thread instead until there is a message available in the receiver or in any of the channels.
-
#select_op(channels, probe = Promises.resolvable_future) ⇒ ResolvableFuture(::Array(Channel, Object))
When message is available in the receiver or any of the provided channels the future is fulfilled with a channel message pair.
-
#select_op_matching(matcher, channels, probe = Promises.resolvable_future) ⇒ ResolvableFuture(::Array(Channel, Object))
When message is available in the receiver or any of the provided channels the future is fulfilled with a channel message pair.
-
#size ⇒ Integer
The number of messages currently stored in the channel.
-
#to_s ⇒ String
(also: #inspect)
Short string representation.
-
#try_pop(no_value = nil) ⇒ Object, no_value
Pop a message from the channel if there is one available.
-
#try_pop_matching(matcher, no_value = nil) ⇒ Object, no_value
Pop a message from the channel if there is one available.
-
#try_push(message) ⇒ true, false
Push the message into the channel if there is space available.
-
#try_select(channels) ⇒ ::Array(Channel, Object), nil
If message is available in the receiver or any of the provided channels the channel message pair is returned.
-
#try_select_matching(matcher, channels) ⇒ ::Array(Channel, Object), nil
If message is available in the receiver or any of the provided channels the channel message pair is returned.
Constructor Details
#initialize(capacity = UNLIMITED_CAPACITY) ⇒ Channel
Create channel.
61 62 63 64 65 66 67 68 69 |
# File 'lib-edge/concurrent/edge/channel.rb', line 61 def initialize(capacity = UNLIMITED_CAPACITY) super() @Capacity = capacity @Mutex = Mutex.new # TODO (pitr-ch 28-Jan-2019): consider linked lists or other data structures for following attributes, things are being deleted from the middle @Probes = [] @Messages = [] @PendingPush = [] end |
Class Method Details
.select(channels, timeout = nil) ⇒ ::Array(Channel, Object), nil
319 320 321 |
# File 'lib-edge/concurrent/edge/channel.rb', line 319 def select(channels, timeout = nil) channels.first.select(channels[1..-1], timeout) end |
.select_matching(matcher, channels, timeout = nil) ⇒ ::Array(Channel, Object), nil
337 338 339 |
# File 'lib-edge/concurrent/edge/channel.rb', line 337 def select_matching(matcher, channels, timeout = nil) channels.first.select_matching(matcher, channels[1..-1], timeout) end |
.select_op(channels, probe = Promises.resolvable_future) ⇒ Future(::Array(Channel, Object))
313 314 315 |
# File 'lib-edge/concurrent/edge/channel.rb', line 313 def select_op(channels, probe = Promises.resolvable_future) channels.first.select_op(channels[1..-1], probe) end |
.select_op_matching(matcher, channels, probe = Promises.resolvable_future) ⇒ Future(::Array(Channel, Object))
331 332 333 |
# File 'lib-edge/concurrent/edge/channel.rb', line 331 def select_op_matching(matcher, channels, probe = Promises.resolvable_future) channels.first.select_op_matching(matcher, channels[1..-1], probe) end |
.try_select(channels) ⇒ ::Array(Channel, Object)
307 308 309 |
# File 'lib-edge/concurrent/edge/channel.rb', line 307 def try_select(channels) channels.first.try_select(channels[1..-1]) end |
.try_select_matching(matcher, channels) ⇒ ::Array(Channel, Object)
325 326 327 |
# File 'lib-edge/concurrent/edge/channel.rb', line 325 def try_select_matching(matcher, channels) channels.first.try_select_matching(matcher, channels[1..-1]) end |
Instance Method Details
#capacity ⇒ Integer
Returns Maximum capacity of the Channel.
292 293 294 |
# File 'lib-edge/concurrent/edge/channel.rb', line 292 def capacity @Capacity end |
#peek(no_value = nil) ⇒ Object, no_value
Behaves as #try_pop but it does not remove the message from the channel
206 207 208 |
# File 'lib-edge/concurrent/edge/channel.rb', line 206 def peek(no_value = nil) peek_matching ANY, no_value end |
#peek_matching(matcher, no_value = nil) ⇒ Object, no_value
Behaves as #try_pop but it does not remove the message from the channel
212 213 214 215 216 217 218 219 |
# File 'lib-edge/concurrent/edge/channel.rb', line 212 def peek_matching(matcher, no_value = nil) @Mutex.synchronize do = matcher, false return if != NOTHING = ns_consume_pending_push matcher, false return != NOTHING ? : no_value end end |
#pop(timeout = nil, timeout_value = nil) ⇒ Object, nil
This function potentially blocks current thread until it can continue. Be careful it can deadlock.
Blocks current thread until a message is available in the channel for popping.
174 175 176 |
# File 'lib-edge/concurrent/edge/channel.rb', line 174 def pop(timeout = nil, timeout_value = nil) pop_matching ANY, timeout, timeout_value end |
#pop_matching(matcher, timeout = nil, timeout_value = nil) ⇒ Object, nil
This function potentially blocks current thread until it can continue. Be careful it can deadlock.
Blocks current thread until a message is available in the channel for popping.
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 |
# File 'lib-edge/concurrent/edge/channel.rb', line 180 def pop_matching(matcher, timeout = nil, timeout_value = nil) # TODO (pitr-ch 27-Jan-2019): should it try to match pending pushes if it fails to match in the buffer? Maybe only if the size is zero. It could be surprising if it's used as a throttle it might be expected that it will not pop if buffer is full of messages which di not match, it might it expected it will block until the message is added to the buffer # that it returns even if the buffer is full. User might expect that it has to be in the buffer first. probe = @Mutex.synchronize do = matcher if == NOTHING = ns_consume_pending_push matcher return if != NOTHING else = ns_consume_pending_push ANY @Messages.push unless == NOTHING return end probe = Promises.resolvable_future @Probes.push probe, false, matcher probe end probe.value!(timeout, timeout_value, [true, timeout_value, nil]) end |
#pop_op(probe = Promises.resolvable_future) ⇒ Future(Object)
Returns a future witch will become fulfilled with a value from the channel when one is available.
If it is later waited on the operation with a timeout e.g.channel.pop_op.wait(1)
it will not prevent the channel to fulfill the operation later after the timeout.
The operation has to be either processed later
pop_op = channel.pop_op
if pop_op.wait(1)
pop_op.value
else
pop_op.then { || }
end
or the operation can be prevented from completion after timing out by using
channel.pop_op.wait(1, [true, nil, nil])
.
It will fulfill the operation on timeout preventing channel from doing the operation,
e.g. popping a message.
157 158 159 |
# File 'lib-edge/concurrent/edge/channel.rb', line 157 def pop_op(probe = Promises.resolvable_future) @Mutex.synchronize { ns_pop_op(ANY, probe, false) } end |
#pop_op_matching(matcher, probe = Promises.resolvable_future) ⇒ Future(Object)
Returns a future witch will become fulfilled with a value from the channel when one is available.
If it is later waited on the operation with a timeout e.g.channel.pop_op.wait(1)
it will not prevent the channel to fulfill the operation later after the timeout.
The operation has to be either processed later
pop_op = channel.pop_op
if pop_op.wait(1)
pop_op.value
else
pop_op.then { || }
end
or the operation can be prevented from completion after timing out by using
channel.pop_op.wait(1, [true, nil, nil])
.
It will fulfill the operation on timeout preventing channel from doing the operation,
e.g. popping a message.
163 164 165 |
# File 'lib-edge/concurrent/edge/channel.rb', line 163 def pop_op_matching(matcher, probe = Promises.resolvable_future) @Mutex.synchronize { ns_pop_op(matcher, probe, false) } end |
#push(message, timeout = nil) ⇒ self, true, false
This function potentially blocks current thread until it can continue. Be careful it can deadlock.
Blocks current thread until the message is pushed into the channel.
117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib-edge/concurrent/edge/channel.rb', line 117 def push(, timeout = nil) pushed_op = @Mutex.synchronize do return timeout ? true : self if ns_try_push() pushed = Promises.resolvable_future # TODO (pitr-ch 06-Jan-2019): clear timed out pushes in @PendingPush, null messages @PendingPush.push , pushed pushed end result = pushed_op.wait!(timeout, [true, self, nil]) result == pushed_op ? self : result end |
#push_op(message) ⇒ ResolvableFuture(self)
Returns future which will fulfill when the message is pushed to the channel.
If it is later waited on the operation with a timeout e.g.channel.pop_op.wait(1)
it will not prevent the channel to fulfill the operation later after the timeout.
The operation has to be either processed later
pop_op = channel.pop_op
if pop_op.wait(1)
pop_op.value
else
pop_op.then { || }
end
or the operation can be prevented from completion after timing out by using
channel.pop_op.wait(1, [true, nil, nil])
.
It will fulfill the operation on timeout preventing channel from doing the operation,
e.g. popping a message.
98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib-edge/concurrent/edge/channel.rb', line 98 def push_op() @Mutex.synchronize do if ns_try_push() Promises.fulfilled_future self else pushed = Promises.resolvable_future @PendingPush.push , pushed return pushed end end end |
#select(channels, timeout = nil) ⇒ ::Array(Channel, Object), nil
This function potentially blocks current thread until it can continue. Be careful it can deadlock.
As #select_op but does not return future, it block current thread instead until there is a message available in the receiver or in any of the channels.
275 276 277 |
# File 'lib-edge/concurrent/edge/channel.rb', line 275 def select(channels, timeout = nil) select_matching ANY, channels, timeout end |
#select_matching(matcher, channels, timeout = nil) ⇒ ::Array(Channel, Object), nil
This function potentially blocks current thread until it can continue. Be careful it can deadlock.
As #select_op but does not return future, it block current thread instead until there is a message available in the receiver or in any of the channels.
281 282 283 284 |
# File 'lib-edge/concurrent/edge/channel.rb', line 281 def select_matching(matcher, channels, timeout = nil) probe = select_op_matching(matcher, channels) probe.value!(timeout, nil, [true, nil, nil]) end |
#select_op(channels, probe = Promises.resolvable_future) ⇒ ResolvableFuture(::Array(Channel, Object))
When message is available in the receiver or any of the provided channels
the future is fulfilled with a channel message pair.
The returned channel is the origin of the message.
If it is later waited on the operation with a timeout e.g.channel.pop_op.wait(1)
it will not prevent the channel to fulfill the operation later after the timeout.
The operation has to be either processed later
pop_op = channel.pop_op
if pop_op.wait(1)
pop_op.value
else
pop_op.then { || }
end
or the operation can be prevented from completion after timing out by using
channel.pop_op.wait(1, [true, nil, nil])
.
It will fulfill the operation on timeout preventing channel from doing the operation,
e.g. popping a message.
254 255 256 |
# File 'lib-edge/concurrent/edge/channel.rb', line 254 def select_op(channels, probe = Promises.resolvable_future) select_op_matching ANY, channels, probe end |
#select_op_matching(matcher, channels, probe = Promises.resolvable_future) ⇒ ResolvableFuture(::Array(Channel, Object))
When message is available in the receiver or any of the provided channels
the future is fulfilled with a channel message pair.
The returned channel is the origin of the message.
If it is later waited on the operation with a timeout e.g.channel.pop_op.wait(1)
it will not prevent the channel to fulfill the operation later after the timeout.
The operation has to be either processed later
pop_op = channel.pop_op
if pop_op.wait(1)
pop_op.value
else
pop_op.then { || }
end
or the operation can be prevented from completion after timing out by using
channel.pop_op.wait(1, [true, nil, nil])
.
It will fulfill the operation on timeout preventing channel from doing the operation,
e.g. popping a message.
260 261 262 263 |
# File 'lib-edge/concurrent/edge/channel.rb', line 260 def select_op_matching(matcher, channels, probe = Promises.resolvable_future) [self, *channels].each { |ch| ch.partial_select_op matcher, probe } probe end |
#size ⇒ Integer
Returns The number of messages currently stored in the channel.
287 288 289 |
# File 'lib-edge/concurrent/edge/channel.rb', line 287 def size @Mutex.synchronize { @Messages.size } end |
#to_s ⇒ String Also known as: inspect
Returns Short string representation.
297 298 299 |
# File 'lib-edge/concurrent/edge/channel.rb', line 297 def to_s format '%s capacity taken %s of %s>', super[0..-2], size, @Capacity end |
#try_pop(no_value = nil) ⇒ Object, no_value
Pop a message from the channel if there is one available.
135 136 137 |
# File 'lib-edge/concurrent/edge/channel.rb', line 135 def try_pop(no_value = nil) try_pop_matching ANY, no_value end |
#try_pop_matching(matcher, no_value = nil) ⇒ Object, no_value
Pop a message from the channel if there is one available.
142 143 144 145 146 147 148 149 |
# File 'lib-edge/concurrent/edge/channel.rb', line 142 def try_pop_matching(matcher, no_value = nil) @Mutex.synchronize do = matcher return if != NOTHING = ns_consume_pending_push matcher return != NOTHING ? : no_value end end |
#try_push(message) ⇒ true, false
Push the message into the channel if there is space available.
74 75 76 |
# File 'lib-edge/concurrent/edge/channel.rb', line 74 def try_push() @Mutex.synchronize { ns_try_push() } end |
#try_select(channels) ⇒ ::Array(Channel, Object), nil
If message is available in the receiver or any of the provided channels the channel message pair is returned. If there is no message nil is returned. The returned channel is the origin of the message.
229 230 231 |
# File 'lib-edge/concurrent/edge/channel.rb', line 229 def try_select(channels) try_select_matching ANY, channels end |
#try_select_matching(matcher, channels) ⇒ ::Array(Channel, Object), nil
If message is available in the receiver or any of the provided channels the channel message pair is returned. If there is no message nil is returned. The returned channel is the origin of the message.
235 236 237 238 239 240 241 242 |
# File 'lib-edge/concurrent/edge/channel.rb', line 235 def try_select_matching(matcher, channels) = nil channel = [self, *channels].find do |ch| = ch.try_pop_matching(matcher, NOTHING) != NOTHING end channel ? [channel, ] : nil end |