"""
RPyC connection factories: ease the creation of a connection for the common
cases)
"""
from __future__ import with_statement
import socket
import threading
try:
from thread import interrupt_main
except ImportError:
try:
from _thread import interrupt_main
except ImportError:
# assume jython (#83)
from java.lang import System
interrupt_main = System.exit
from rpyc import Connection, Channel, SocketStream, TunneledSocketStream, PipeStream, VoidService
from rpyc.utils.registry import UDPRegistryClient
from rpyc.lib import safe_import
ssl = safe_import("ssl")
class DiscoveryError(Exception):
pass
#------------------------------------------------------------------------------
# API
#------------------------------------------------------------------------------
[docs]def connect_channel(channel, service = VoidService, config = {}):
"""creates a connection over a given channel
:param channel: the channel to use
:param service: the local service to expose (defaults to Void)
:param config: configuration dict
:returns: an RPyC connection
"""
return Connection(service, channel, config = config)
[docs]def connect_stream(stream, service = VoidService, config = {}):
"""creates a connection over a given stream
:param stream: the stream to use
:param service: the local service to expose (defaults to Void)
:param config: configuration dict
:returns: an RPyC connection
"""
return connect_channel(Channel(stream), service = service, config = config)
[docs]def connect_pipes(input, output, service = VoidService, config = {}):
"""
creates a connection over the given input/output pipes
:param input: the input pipe
:param output: the output pipe
:param service: the local service to expose (defaults to Void)
:param config: configuration dict
:returns: an RPyC connection
"""
return connect_stream(PipeStream(input, output), service = service, config = config)
[docs]def connect_stdpipes(service = VoidService, config = {}):
"""
creates a connection over the standard input/output pipes
:param service: the local service to expose (defaults to Void)
:param config: configuration dict
:returns: an RPyC connection
"""
return connect_stream(PipeStream.from_std(), service = service, config = config)
[docs]def connect(host, port, service = VoidService, config = {}, ipv6 = False, keepalive = False):
"""
creates a socket-connection to the given host and port
:param host: the hostname to connect to
:param port: the TCP port to use
:param service: the local service to expose (defaults to Void)
:param config: configuration dict
:param ipv6: whether to use IPv6 or not
:returns: an RPyC connection
"""
s = SocketStream.connect(host, port, ipv6 = ipv6, keepalive = keepalive)
return connect_stream(s, service, config)
[docs]def unix_connect(path, service = VoidService, config = {}):
"""
creates a socket-connection to the given host and port
:param path: the path to the unix domain socket
:param service: the local service to expose (defaults to Void)
:param config: configuration dict
:returns: an RPyC connection
"""
s = SocketStream.unix_connect(path)
return connect_stream(s, service, config)
[docs]def ssl_connect(host, port, keyfile = None, certfile = None, ca_certs = None,
cert_reqs = None, ssl_version = None, ciphers = None,
service = VoidService, config = {}, ipv6 = False, keepalive = False):
"""
creates an SSL-wrapped connection to the given host (encrypted and
authenticated).
:param host: the hostname to connect to
:param port: the TCP port to use
:param service: the local service to expose (defaults to Void)
:param config: configuration dict
:param ipv6: whether to create an IPv6 socket or an IPv4 one
The following arguments are passed directly to
`ssl.wrap_socket <http://docs.python.org/dev/library/ssl.html#ssl.wrap_socket>`_:
:param keyfile: see ``ssl.wrap_socket``. May be ``None``
:param certfile: see ``ssl.wrap_socket``. May be ``None``
:param ca_certs: see ``ssl.wrap_socket``. May be ``None``
:param cert_reqs: see ``ssl.wrap_socket``. By default, if ``ca_cert`` is specified,
the requirement is set to ``CERT_REQUIRED``; otherwise it is
set to ``CERT_NONE``
:param ssl_version: see ``ssl.wrap_socket``. The default is ``PROTOCOL_TLSv1``
:param ciphers: see ``ssl.wrap_socket``. May be ``None``. New in Python 2.7/3.2
:returns: an RPyC connection
"""
ssl_kwargs = {"server_side" : False}
if keyfile is not None:
ssl_kwargs["keyfile"] = keyfile
if certfile is not None:
ssl_kwargs["certfile"] = certfile
if ca_certs is not None:
ssl_kwargs["ca_certs"] = ca_certs
ssl_kwargs["cert_reqs"] = ssl.CERT_REQUIRED
if cert_reqs is not None:
ssl_kwargs["cert_reqs"] = cert_reqs
if ssl_version is None:
ssl_kwargs["ssl_version"] = ssl.PROTOCOL_TLSv1
else:
ssl_kwargs["ssl_version"] = ssl_version
if ciphers is not None:
ssl_kwargs["ciphers"] = ciphers
s = SocketStream.ssl_connect(host, port, ssl_kwargs, ipv6 = ipv6, keepalive = keepalive)
return connect_stream(s, service, config)
def _get_free_port():
"""attempts to find a free port"""
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("localhost", 0))
_, port = s.getsockname()
s.close()
return port
_ssh_connect_lock = threading.Lock()
[docs]def ssh_connect(remote_machine, remote_port, service = VoidService, config = {}):
"""
Connects to an RPyC server over an SSH tunnel (created by plumbum).
See `Plumbum tunneling <http://plumbum.readthedocs.org/en/latest/remote.html#tunneling>`_
for further details.
.. note::
This function attempts to allocate a free TCP port for the underlying tunnel, but doing
so is inherently prone to a race condition with other processes who might bind the
same port before sshd does. Albeit unlikely, there is no sure way around it.
:param remote_machine: an :class:`plumbum.remote.RemoteMachine` instance
:param remote_port: the port of the remote server
:param service: the local service to expose (defaults to Void)
:param config: configuration dict
:returns: an RPyC connection
"""
with _ssh_connect_lock:
loc_port = _get_free_port()
tun = remote_machine.tunnel(loc_port, remote_port)
stream = TunneledSocketStream.connect("localhost", loc_port)
stream.tun = tun
return Connection(service, Channel(stream), config = config)
[docs]def discover(service_name, host = None, registrar = None, timeout = 2):
"""
discovers hosts running the given service
:param service_name: the service to look for
:param host: limit the discovery to the given host only (None means any host)
:param registrar: use this registry client to discover services. if None,
use the default UDPRegistryClient with the default settings.
:param timeout: the number of seconds to wait for a reply from the registry
if no hosts are found, raises DiscoveryError
:raises: ``DiscoveryError`` if no server is found
:returns: a list of (ip, port) pairs
"""
if registrar is None:
registrar = UDPRegistryClient(timeout = timeout)
addrs = registrar.discover(service_name)
if not addrs:
raise DiscoveryError("no servers exposing %r were found" % (service_name,))
if host:
ips = socket.gethostbyname_ex(host)[2]
addrs = [(h, p) for h, p in addrs if h in ips]
if not addrs:
raise DiscoveryError("no servers exposing %r were found on %r" % (service_name, host))
return addrs
[docs]def connect_by_service(service_name, host = None, service = VoidService, config = {}):
"""create a connection to an arbitrary server that exposes the requested service
:param service_name: the service to discover
:param host: limit discovery to the given host only (None means any host)
:param service: the local service to expose (defaults to Void)
:param config: configuration dict
:raises: ``DiscoveryError`` if no server is found
:returns: an RPyC connection
"""
# The registry server may have multiple services registered for the same service name,
# some of which could be dead. We iterate over the list returned and return the first
# one we could connect to. If none of the registered servers is responsive we re-throw
# the exception
addrs = discover(service_name, host = host)
for host, port in addrs:
try:
return connect(host, port, service, config = config)
except socket.error:
pass
raise DiscoveryError("All services are down: %s" % (addrs,))
[docs]def connect_subproc(args, service = VoidService, config = {}):
"""runs an rpyc server on a child process that and connects to it over
the stdio pipes. uses the subprocess module.
:param args: the args to Popen, e.g., ["python", "-u", "myfile.py"]
:param service: the local service to expose (defaults to Void)
:param config: configuration dict
"""
from subprocess import Popen, PIPE
proc = Popen(args, stdin = PIPE, stdout = PIPE)
conn = connect_pipes(proc.stdout, proc.stdin, service = service, config = config)
conn.proc = proc # just so you can have control over the processs
return conn
[docs]def connect_thread(service = VoidService, config = {}, remote_service = VoidService, remote_config = {}):
"""starts an rpyc server on a new thread, bound to an arbitrary port,
and connects to it over a socket.
:param service: the local service to expose (defaults to Void)
:param config: configuration dict
:param server_service: the remote service to expose (of the server; defaults to Void)
:param server_config: remote configuration dict (of the server)
"""
listener = socket.socket()
listener.bind(("localhost", 0))
listener.listen(1)
def server(listener = listener):
client = listener.accept()[0]
listener.close()
conn = connect_stream(SocketStream(client), service = remote_service,
config = remote_config)
try:
conn.serve_all()
except KeyboardInterrupt:
interrupt_main()
t = threading.Thread(target = server)
t.setDaemon(True)
t.start()
host, port = listener.getsockname()
return connect(host, port, service = service, config = config)
[docs]def connect_multiprocess(service = VoidService, config = {}, remote_service = VoidService, remote_config = {}, args={}):
"""starts an rpyc server on a new process, bound to an arbitrary port,
and connects to it over a socket. Basically a copy of connect_thread().
However if args is used and if these are shared memory then changes
will be bi-directional. That is we now have access to shared memmory.
:param service: the local service to expose (defaults to Void)
:param config: configuration dict
:param server_service: the remote service to expose (of the server; defaults to Void)
:param server_config: remote configuration dict (of the server)
:param args: dict of local vars to pass to new connection, form {'name':var}
Contributed by *@tvanzyl*
"""
from multiprocessing import Process
listener = socket.socket()
listener.bind(("localhost", 0))
listener.listen(1)
def server(listener=listener, args=args):
client = listener.accept()[0]
listener.close()
conn = connect_stream(SocketStream(client), service = remote_service, config = remote_config)
try:
for k in args:
conn._local_root.exposed_namespace[k] = args[k]
conn.serve_all()
except KeyboardInterrupt:
interrupt_main()
t = Process(target = server)
t.start()
host, port = listener.getsockname()
return connect(host, port, service = service, config = config)