Skip to content

0x1eef/xchan.rb

Repository files navigation

About

xchan.rb is an easy to use library that provides a channel for InterProcess Communication (IPC) between a parent Ruby process, and its child processes. The channel is implemented through both serialization, and an unbound unix socket.

Examples

Serialization

Options

When a channel is written to or read from, a Ruby object is serialized (on write) or deserialized (on read). The default serializers are available as xchan(:marshal), xchan(:json), and xchan(:yaml). For scenarios where it is preferred to send and receive plain strings, the "plain" serializer is available as xchan(:plain). This example uses Marshal:

require "xchan"

##
# This channel uses Marshal to serialize objects.
ch = xchan
pid = fork { print "Received message: ", ch.recv[:msg], "\n" }
ch.send(msg: "serialized by Marshal")
ch.close
Process.wait(pid)

##
# This channel also uses Marshal to serialize objects.
ch = xchan(:marshal)
pid = fork { print "Received message: ", ch.recv[:msg], "\n"
ch.send(msg: "serialized by Marshal")
ch.close
Process.wait(pid)

##
# Received message: serialized by Marshal
# Received message: serialized by Marshal

Read operations

#recv

The ch.recv method performs a blocking read. A read can block when a lock is held by another process, or when a read from the underlying socket blocks. This example performs a read that blocks in a child process until the parent process writes to the channel:

require "xchan"

ch = xchan
pid = fork do
  print "Received a random number (child process): ", ch.recv, "\n"
end
# Delay for a second to let a process fork, and call "ch.recv"
sleep(1)
print "Send a random number (from parent process)", "\n"
ch.send(rand(21))
Process.wait(pid)
ch.close

##
# Send a random number (from parent process)
# Received random number (child process): XX

#recv_nonblock

The non-blocking counterpart to #recv is #recv_nonblock. The #recv_nonblock method raises Chan::WaitLockable when a read blocks because of a lock held by another process, and the method raises Chan::WaitReadable when a read on the underlying socket blocks. This example performs a read that will raise Chan::WaitReadable:

require "xchan"

def read(ch)
  ch.recv_nonblock
rescue Chan::WaitReadable
  print "Wait 1 second for channel to be readable", "\n"
  ch.wait_readable(1)
  retry
rescue Chan::WaitLockable
  sleep 0.01
  retry
end
trap("SIGINT") { exit(1) }
read(xchan)

##
# Wait 1 second for channel to be readable
# Wait 1 second for channel to be readable
# ^C

Write operations

#send

The ch.send method performs a blocking write. A write can block when a lock is held by another process, or when a write to the underlying socket blocks. This example performs a write that will block when the send buffer becomes full:

require "xchan"

ch = xchan(:marshal, sock_type: Socket::SOCK_STREAM)
sndbuf = ch.getsockopt(:reader, Socket::SOL_SOCKET, Socket::SO_SNDBUF)
while ch.bytes_sent <= sndbuf.int
  ch.send(1)
end

#send_nonblock

The non-blocking counterpart to #send is #send_nonblock. The #send_nonblock method raises Chan::WaitLockable when a write blocks because of a lock held by another process, and the method raises Chan::WaitReadable when a write to the underlying socket blocks. This example builds on the previous example by freeing space on the send buffer when a write is found to block:

require "xchan"

def send_nonblock(ch, buf)
  ch.send_nonblock(buf)
rescue Chan::WaitWritable
  print "Blocked - free send buffer", "\n"
  ch.recv
  retry
rescue Chan::WaitLockable
  sleep 0.01
  retry
end

ch = xchan(:marshal, sock_type: Socket::SOCK_STREAM)
sndbuf = ch.getsockopt(:writer, Socket::SOL_SOCKET, Socket::SO_SNDBUF)
while ch.bytes_sent <= sndbuf.int
  send_nonblock(ch, 1)
end

##
# Blocked - free send buffer

Socket

Types

A channel can be created with one of three sockets types:

  • Socket::SOCK_DGRAM
  • Socket::SOCK_STREAM
  • Socket::SOCK_SEQPACKET

The default is Socket::SOCK_DGRAM because its default settings provide the most buffer space. The socket type can be specified as a keyword argument:

require "xchan"
ch = xchan(:marshal, sock_type: Socket::SOCK_STREAM)

Options

A channel is composed of two sockets, one for reading and the other for writing. Socket options can be read and set on either of the two sockets with the Chan::UNIXSocket#getsockopt, and Chan::UNIXSocket#setsockopt methods. Besides the first argument (:reader, or :writer), the rest of the arguments are identical to Socket#{getsockopt,setsockopt}. This example's results can vary depending on the operating system it is run on:

require "xchan"
ch = xchan(:marshal)

##
# Print the value of SO_RCVBUF
rcvbuf = ch.getsockopt(:reader, Socket::SOL_SOCKET, Socket::SO_RCVBUF)
print "The read buffer can contain a maximum of: ", rcvbuf.int, " bytes.\n"

##
# Print the value of SO_SNDBUF
sndbuf = ch.getsockopt(:writer, Socket::SOL_SOCKET, Socket::SO_SNDBUF)
print "The maximum size of a single message is: ", sndbuf.int, " bytes.\n"

##
# The read buffer can contain a maximum of: 16384 bytes.
# The maximum size of a single message is: 2048 bytes.

Temporary files

tmpdir

A single channel creates three temporary files that are removed from the filesystem as soon as they are created. By default the files are stored - for a short time - in Dir.tmpdir. Read and write permissions are reserved for the process that created them, inclusive of its child processes.

The parent directory of the temporary files can be changed with the tmpdir option:

require "xchan"
ch = xchan(:marshal, tmpdir: Dir.home)

Sources

Install

Git

xchan.rb is distributed as a RubyGem through its git repositories.
GitHub, and GitLab are available as sources.

# Gemfile
gem "xchan.rb", github: "0x1eef/xchan.rb", tag: "v0.16.5"

Rubygems.org

xchan.rb can also be installed via rubygems.org.

gem install xchan.rb

BSD Zero Clause.
See LICENSE.

About

An easy to use InterProcess Communication (IPC) library.

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages