# -*- coding: utf-8 -*-
# *********************************************************************
# lewis - a library for creating hardware device simulators
# Copyright (C) 2016-2021 European Spallation Source ERIC
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# *********************************************************************
import asynchat
import asyncore
import inspect
import re
import socket
from typing import NoReturn
from scanf import scanf_compile
from lewis.core.adapters import Adapter
from lewis.core.devices import InterfaceBase
from lewis.core.logging import has_log
from lewis.core.utils import format_doc_text
[docs]
@has_log
class StreamHandler(asynchat.async_chat):
def __init__(self, sock, target, stream_server) -> None:
asynchat.async_chat.__init__(self, sock=sock)
self.set_terminator(target.in_terminator.encode())
self._readtimeout = target.readtimeout
self._readtimer = 0
self._target = target
self._buffer = []
self._stream_server = stream_server
self._target.handler = self
self._set_logging_context(target)
self.log.info("Client connected from %s:%s", *sock.getpeername())
def process(self, msec) -> None:
if not self._buffer:
return
if self._readtimer >= self._readtimeout and self._readtimeout != 0:
if not self.get_terminator():
# If no terminator is set, this timeout is the terminator
self.found_terminator()
else:
self._readtimer = 0
request = self._get_request()
with self._stream_server.device_lock:
error = RuntimeError("ReadTimeout while waiting for command terminator.")
reply = self._handle_error(request, error)
self._send_reply(reply)
if self._buffer:
self._readtimer += msec
def collect_incoming_data(self, data) -> None:
self._buffer.append(data)
self._readtimer = 0
def _get_request(self):
request = b"".join(self._buffer)
self._buffer = []
self.log.debug("Got request %s", request)
return request
def _push(self, reply) -> None:
try:
if isinstance(reply, str):
reply = reply.encode()
out_terminator = (
self._target.out_terminator.encode()
if isinstance(self._target.out_terminator, str)
else self._target.out_terminator
)
self.push(reply + out_terminator)
except TypeError as e:
self.log.error("Problem creating reply, type error {}!".format(e))
def _send_reply(self, reply) -> None:
if reply is not None:
self.log.debug("Sending reply %s", reply)
self._push(reply)
def _handle_error(self, request, error):
self.log.debug("Error while processing request", exc_info=error)
return self._target.handle_error(request, error)
def found_terminator(self) -> None:
self._readtimer = 0
request = self._get_request()
with self._stream_server.device_lock:
try:
cmd = next(
(cmd for cmd in self._target.bound_commands if cmd.can_process(request)),
None,
)
if cmd is None:
raise RuntimeError("None of the device's commands matched.")
self.log.info(
"Processing request %s using command %s",
request,
cmd.matcher.pattern,
)
reply = cmd.process_request(request)
except Exception as error:
reply = self._handle_error(request, error)
self._send_reply(reply)
def unsolicited_reply(self, reply) -> None:
self.log.debug("Sending unsolicited reply %s", reply)
self._push(reply)
def handle_close(self) -> None:
self.log.info("Closing connection to client %s:%s", *self.socket.getpeername())
self._stream_server.remove_handler(self)
asynchat.async_chat.handle_close(self)
@has_log
class StreamServer(asyncore.dispatcher):
def __init__(self, host, port, target, device_lock) -> None:
asyncore.dispatcher.__init__(self)
self.target = target
self.device_lock = device_lock
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind((host, port))
self.listen(5)
self._set_logging_context(target)
self.log.info("Listening on %s:%s", host, port)
self._accepted_connections = []
def handle_accept(self) -> None:
pair = self.accept()
if pair is not None:
sock, addr = pair
handler = StreamHandler(sock, self.target, self)
self._accepted_connections.append(handler)
def remove_handler(self, handler) -> None:
self._accepted_connections.remove(handler)
def close(self) -> None:
# As this is an old style class, the base class method must
# be called directly. This is important to still perform all
# the teardown-work that asyncore.dispatcher does.
self.log.info("Shutting down server, closing all remaining client connections.")
asyncore.dispatcher.close(self)
# But in addition, close all open sockets and clear the connection list.
for handler in self._accepted_connections:
handler.close()
self._accepted_connections = []
def process(self, msec) -> None:
for handler in self._accepted_connections:
handler.process(msec)
[docs]
class PatternMatcher:
"""
This class defines an interface for general command-matchers that use any kind of
technique to match a certain request in string form. It is used by :class:`Func` to check
whether a request can be processed using a function and to extract any function arguments.
Sub-classes must implement all defined abstract methods/properties.
.. seealso::
:class:`regex`, :class:`scanf` are concrete implementations of this class.
"""
def __init__(self, pattern) -> None:
self._pattern = pattern
@property
def pattern(self):
"""The pattern definition used for matching a request."""
return self._pattern
@property
def arg_count(self) -> NoReturn:
"""Number of arguments that are matched in a request."""
raise NotImplementedError("The arg_count property must be implemented.")
@property
def argument_mappings(self) -> NoReturn:
"""Mapping functions that can be applied to the arguments returned by :meth:`match`."""
raise NotImplementedError("The argument_mappings property must be implemented.")
[docs]
def match(self, request) -> NoReturn:
"""
Tries to match the request against the internally stored pattern. Returns any matched
function arguments.
:param request: Request to attempt matching.
:return: List of matched argument values (possibly empty) or None if not matching.
"""
raise NotImplementedError("The match-method must be implemented.")
[docs]
class regex(PatternMatcher):
"""
Implementation of :class:`PatternMatcher` that compiles the specified pattern into a regular
expression.
"""
def __init__(self, pattern) -> None:
super(regex, self).__init__(pattern)
self.compiled_pattern = re.compile(pattern.encode())
@property
def arg_count(self):
return self.compiled_pattern.groups
@property
def argument_mappings(self) -> None:
return None
[docs]
def match(self, request):
match = self.compiled_pattern.match(request)
if match is None:
return None
return match.groups()
[docs]
class scanf(regex):
"""
Interprets the specified pattern as a scanf format. Internally, the scanf_ package is used
to transform the format into a regular expression. Please consult the documentation of scanf_
for valid pattern specifications.
By default, the resulting regular expression matches exactly. Consider this example:
.. sourcecode:: Python
exact = scanf("T=%f")
not_exact = scanf("T=%f", exact_match=False)
The first pattern only matches the string ``T=4.0``, whereas the second would also match
``T=4.0garbage``. Please note that the specifiers like ``%f`` are automatically turned into
groups in the generated regular expression.
:param pattern: Scanf format specification.
:param exact_match: Match only if the entire string matches.
.. _scanf: https://github.com/joshburnett/scanf
"""
def __init__(self, pattern, exact_match=True) -> None:
self._scanf_pattern = pattern
generated_regex, self._argument_mappings = scanf_compile(pattern)
regex_pattern = generated_regex.pattern
if exact_match:
regex_pattern = "^{}$".format(regex_pattern)
super(scanf, self).__init__(regex_pattern)
@property
def pattern(self):
return self._scanf_pattern
@property
def argument_mappings(self):
return self._argument_mappings
[docs]
class Func:
"""
Objects of this type connect a callable object to a pattern matcher (:class:`PatternMatcher`),
which currently comprises :class:`regex` and :class:`scanf`. Strings are also
accepted, they are treated like a regular expression internally. This preserves default
behavior from older versions of Lewis.
In general, Func-objects should not be created directly, instead they are created by one of
the sub-classes of :class:`CommandBase` using :meth:`~CommandBase.bind`.
Function arguments are indicated by groups in the regular expression. The number of
groups has to match the number of arguments of the function. In earlier versions of Lewis it
was possible to pass flags to ``re.compile``, this has been removed for consistency issues
in :class:`Var`. It is however still possible to use the exact same flags as part of the
regular expression. In the documentation of re_, this is outlined, simply add a group to the
expression that contains the flags, for example ``(?i)`` to make the expression case
insensitive. This special group does not count towards the matching groups used for argument
capture.
The optional argument_mappings can be an iterable of callables with one parameter of the
same length as the number of arguments of the function. The first parameter will be
transformed using the first function, the second using the second function and so on.
This can be useful to automatically transform strings provided by the adapter into a proper
data type such as ``int`` or ``float`` before they are passed to the function. In case the
pattern is of type :class:`scanf`, this is optional (but will override the mappings
provided by the matcher).
The return_mapping argument is similar, it should map the return value of the function
to a string. The default map function only does that when the supplied value
is not None. It can also be set to a numeric value or a string constant so that the
command always returns the same value. If it is ``None``, the return value is not
modified at all.
Finally, documentation can be provided by passing the doc-argument. If it is omitted,
the docstring of the bound function is used and if that is not present, left empty.
:param func: Function to be called when pattern matches or member of device/interface.
:param pattern: :class:`regex`, :class:`scanf` object or string.
:param argument_mappings: Iterable with mapping functions from string to some type.
:param return_mapping: Mapping function for return value of method.
:param doc: Description of the command. If not supplied, the docstring is used.
:raises: RuntimeError: If the function cannot be mapped for any reason.
.. _re: https://docs.python.org/2/library/re.html#regular-expression-syntax
"""
def __init__(
self, func, pattern, argument_mappings=None, return_mapping=None, doc=None
) -> None:
if not callable(func):
raise RuntimeError("Can not construct a Func-object from a non callable object.")
self.func = func
func_name = getattr(func, "__name__", repr(func))
if isinstance(pattern, str):
try:
pattern = regex(pattern)
except re.error as e:
raise RuntimeError(
f"The pattern '{pattern}' for function '{func_name}' is invalid regex: {e}"
)
self.matcher = pattern
if argument_mappings is None:
argument_mappings = self.matcher.argument_mappings or None
try:
inspect.getcallargs(func, *[None] * self.matcher.arg_count)
except TypeError:
raise RuntimeError(
"The number of arguments for function '{}' matched by pattern "
"'{}' is not compatible with number of defined "
"groups in pattern ({}).".format(
func_name,
self.matcher.pattern,
self.matcher.arg_count,
)
)
if argument_mappings is not None and (self.matcher.arg_count != len(argument_mappings)):
raise RuntimeError(
"Supplied argument mappings for function matched by pattern '{}' specify {} "
"argument(s), but the function has {} arguments.".format(
self.matcher, len(argument_mappings), self.matcher.arg_count
)
)
self.argument_mappings = argument_mappings
self.return_mapping = return_mapping
self.doc = doc or (inspect.getdoc(self.func) if callable(self.func) else None)
def can_process(self, request):
return self.matcher.match(request) is not None
def process_request(self, request):
match = self.matcher.match(request)
if match is None:
raise RuntimeError("Request can not be processed.")
args = self.map_arguments(match)
return self.map_return_value(self.func(*args))
[docs]
def map_arguments(self, arguments):
"""
Returns the mapped function arguments. If no mapping functions are defined, the arguments
are returned as they were supplied.
:param arguments: List of arguments for bound function as strings.
:return: Mapped arguments.
"""
if self.argument_mappings is None:
return arguments
return [f(a) for f, a in zip(self.argument_mappings, arguments)]
[docs]
def map_return_value(self, return_value):
"""
Returns the mapped return_value of a processed request. If no return_mapping has been
defined, the value is returned as is. If return_mapping is a static value, that value
is returned, ignoring return_value completely.
:param return_value: Value to map.
:return: Mapped return value.
"""
if callable(self.return_mapping):
return self.return_mapping(return_value)
if self.return_mapping is not None:
return self.return_mapping
return return_value
[docs]
class CommandBase:
"""
This is the common base class of :class:`Cmd` and :class:`Var`. The concept of commands for
the stream adapter is based on connecting a callable object to a pattern that matches an
inbound request.
The type of pattern can be either an implementation of :class:`PatternMatcher`
(regex or scanf format specification) or a plain string (which is treated as a regular
expression).
For free function and lambda expressions this is straightforward: the function object can
simply be stored together with the pattern. Most often however, the callable
is a method of the device or interface object - these do not exist when the commands are
defined.
This problem is solved by introducing a "bind"-step in :class:`StreamAdapter`. So instead
of a function object, both :class:`Cmd` and :class:`Var` store the name of a member of device
or interface. At "bind-time", this is translated into the correct callable.
So instead of using :class:`Cmd` or :class:`Var` directly, both classes' :meth:`bind`-methods
return an iterable of :class:`Func`-objects which can be used for processing requests.
:class:`StreamAdapter` performs this bind-step when it's constructed. For details regarding
the implementations, please see the corresponding classes.
.. seealso::
Please take a look at :class:`Cmd` for exposing callable objects or methods of
device/interface and :class:`Var` for exposing attributes and properties.
To see how argument_mappings, return_mapping and doc are applied, please look at
:class:`Func`.
:param func: Function to be called when pattern matches or member of device/interface.
:param pattern: Pattern to match (:class:`PatternMatcher` or string).
:param argument_mappings: Iterable with mapping functions from string to some type.
:param return_mapping: Mapping function for return value of method.
:param doc: Description of the command. If not supplied, the docstring is used.
"""
def __init__(
self, func, pattern, argument_mappings=None, return_mapping=None, doc=None
) -> None:
super(CommandBase, self).__init__()
self.func = func
self.pattern = pattern
self.argument_mappings = argument_mappings
self.return_mapping = return_mapping
self.doc = doc
def bind(self, target) -> NoReturn:
raise NotImplementedError("Binders need to implement the bind method.")
[docs]
class Cmd(CommandBase):
"""
This class is an implementation of :class:`CommandBase` that can expose a callable object
or a named method of the device/interface controlled by :class:`StreamAdapter`.
.. sourcecode:: Python
def random():
return 6
SomeInterface(StreamInterface):
commands = {
Cmd(lambda: 4, pattern='^R$', doc='Returns a random number.'),
Cmd('random', pattern='^RR$', doc='Better random number.'),
Cmd(random, pattern='^RRR$', doc='The best random number.'),
}
def random(self):
return 5
The interface defined by the above example has three commands, ``R`` which calls a lambda
function that always returns 4, ``RR``, which calls ``SomeInterface.random`` and returns 5 and
lastly ``RRR`` which calls the free function defined above and returns the best random number.
For a detailed explanation of requirements to the constructor arguments, please refer to the
documentation of :class:`Func`, to which the arguments are forwarded.
.. seealso ::
:class:`Var` exposes attributes and properties of a device object. The documentation
of :class:`Func` provides more information about the common constructor arguments.
:param func: Function to be called when pattern matches or member of device/interface.
:param pattern: Pattern to match (:class:`PatternMatcher` or string).
:param argument_mappings: Iterable with mapping functions from string to some type.
:param return_mapping: Mapping function for return value of method.
:param doc: Description of the command. If not supplied, the docstring is used.
"""
def __init__(
self,
func,
pattern,
argument_mappings=None,
return_mapping=lambda x: None if x is None else str(x),
doc=None,
) -> None:
super(Cmd, self).__init__(func, pattern, argument_mappings, return_mapping, doc)
def bind(self, target):
method = self.func if callable(self.func) else getattr(target, self.func, None)
if method is None:
return None
return [
Func(
method,
self.pattern,
self.argument_mappings,
self.return_mapping,
self.doc,
)
]
[docs]
class Var(CommandBase):
r"""
With this implementation of :class:`CommandBase` it's possible to expose plain data attributes
or properties of device or interface. Getting and setting a value are separate procedures
which both have their own pattern, read_pattern and write_pattern to match a command each.
Please note that write_pattern has to have exactly one group defined to match a parameter.
Due to this separation, parameters can be made read-only, write-only or read-write in the
interface:
.. sourcecode:: Python
class SomeInterface(StreamInterface):
commands = {
Var('foo', read_pattern='^F$', write_pattern=r'^F=(\d+)$',
argument_mappings=(int,), doc='An integer attribute.'),
Var('bar' read_pattern='^B$')
}
foo = 10
@property
def bar(self):
return self.foo + 5
@bar.setter
def bar(self, new_bar):
self.foo = new_bar - 5
In the above example, the foo attribute can be read and written, it's automatically converted
to an integer, while bar is a property that can only be read via the stream protocol.
.. seealso::
For exposing methods and free functions, there's the :class:`Cmd`-class.
:param target_member: Attribute or property of device/interface to expose.
:param read_pattern: Pattern to match for getter (:class:`PatternMatcher` or string).
:param write_pattern: Pattern to match for setter (:class:`PatternMatcher` or string).
:param argument_mappings: Iterable with mapping functions from string to some type,
only applied to setter.
:param return_mapping: Mapping function for return value of method,
applied to getter and setter.
:param doc: Description of the command. If not supplied, the docstring is used. For plain data
attributes the only way to get docs is to supply this argument.
"""
def __init__(
self,
target_member,
read_pattern=None,
write_pattern=None,
argument_mappings=None,
return_mapping=lambda x: None if x is None else str(x),
doc=None,
) -> None:
super(Var, self).__init__(target_member, None, argument_mappings, return_mapping, doc)
self.target = None
self.read_pattern = read_pattern
self.write_pattern = write_pattern
def bind(self, target):
if self.func not in dir(target):
return None
funcs = []
if self.read_pattern is not None:
def getter():
return getattr(target, self.func)
# Copy docstring if target is a @property
prop = getattr(type(target), self.func, None)
if prop and inspect.isdatadescriptor(prop):
getter.__doc__ = "Getter: " + inspect.getdoc(prop)
funcs.append(
Func(
getter,
self.read_pattern,
return_mapping=self.return_mapping,
doc=self.doc,
)
)
if self.write_pattern is not None:
def setter(new_value) -> None:
setattr(target, self.func, new_value)
# Copy docstring if target is a @property
prop = getattr(type(target), self.func, None)
if prop and inspect.isdatadescriptor(prop):
setter.__doc__ = "Setter: " + inspect.getdoc(prop)
funcs.append(
Func(
setter,
self.write_pattern,
argument_mappings=self.argument_mappings,
return_mapping=self.return_mapping,
doc=self.doc,
)
)
return funcs
[docs]
class StreamAdapter(Adapter):
"""
The StreamAdapter is the bridge between the Device Interface and the TCP Stream networking
backend implementation.
Available adapter options are:
- bind_address: IP of network adapter to bind on (defaults to 0.0.0.0, or all adapters)
- port: Port to listen on (defaults to 9999)
- telnet_mode: When True, overrides in- and out-terminator for CRNL (defaults to False)
:param options: Dictionary with options.
"""
default_options = {"telnet_mode": False, "bind_address": "0.0.0.0", "port": 9999}
def __init__(self, options=None) -> None:
super(StreamAdapter, self).__init__(options)
self._server = None
@property
def documentation(self):
commands = [
"{}:\n{}".format(
cmd.matcher.pattern,
format_doc_text(cmd.doc or inspect.getdoc(cmd.func) or ""),
)
for cmd in sorted(self.interface.bound_commands, key=lambda x: x.matcher.pattern)
]
options = format_doc_text(
"Listening on: {}\nPort: {}\nRequest terminator: {}\nReply terminator: {}".format(
self._options.bind_address,
self._options.port,
repr(self.interface.in_terminator),
repr(self.interface.out_terminator),
)
)
return "\n\n".join(
[
inspect.getdoc(self.interface) or "",
"Parameters\n==========",
options,
"Commands\n========",
]
+ commands
)
[docs]
def start_server(self) -> None:
"""
Starts the TCP stream server, binding to the configured host and port.
Host and port are configured via the command line arguments.
.. note:: The server does not process requests unless
:meth:`handle` is called in regular intervals.
"""
if self._server is None:
if self._options.telnet_mode:
self.interface.in_terminator = "\r\n"
self.interface.out_terminator = "\r\n"
self._server = StreamServer(
self._options.bind_address,
self._options.port,
self.interface,
self.device_lock,
)
[docs]
def stop_server(self) -> None:
if self._server is not None:
self._server.close()
self._server = None
@property
def is_running(self):
return self._server is not None
[docs]
def handle(self, cycle_delay=0.1) -> None:
"""
Spend approximately ``cycle_delay`` seconds to process requests to the server.
:param cycle_delay: S
"""
asyncore.loop(cycle_delay, count=1)
self._server.process(int(cycle_delay * 1000))
[docs]
class StreamInterface(InterfaceBase):
r"""
This class is used to provide a TCP-stream based interface to a device.
Many hardware devices use a protocol that is based on exchanging text with a client via
a TCP stream. Sometimes RS232-based devices are also exposed this way via an adapter-box.
This adapter makes it easy to mimic such a protocol.
This class has the following attributes which may be overridden by subclasses:
- protocol: What this interface is called for purposes of the -p commandline option.
Defaults to "stream".
- in_terminator, out_terminator: These define how lines are terminated when transferred
to and from the device respectively. They are stripped/added automatically.
Inverse of protocol file InTerminator and OutTerminator. The default is ``\\r``.
- readtimeout: How many msec to wait for additional data between packets, once transmission
of an incoming command has begun. Inverse of ReadTimeout in protocol files.
Defaults to 100 (ms). Set to 0 to disable timeout completely.
- commands: A list of :class:`~CommandBase`-objects that define mappings between protocol
and device/interface methods/attributes.
By default, commands are expressed as regular expressions, a simple example may look like this:
.. sourcecode:: Python
class SimpleDeviceStreamInterface(StreamInterface):
commands = [
Cmd('set_speed', r'^S=([0-9]+)$', argument_mappings=[int]),
Cmd('get_speed', r'^S\?$')
Var('speed', read_pattern=r'^V\?$', write_pattern=r'^V=([0-9]+)$')
]
def set_speed(self, new_speed):
self.device.speed = new_speed
def get_speed(self):
return self.device.speed
The interface has two commands, ``S?`` to return the speed and ``S=10`` to set the speed
to an integer value. It also exposes the same speed attribute as a variable, using auto-
generated ``V?`` and ``V=10`` commands.
As in the :class:`lewis.adapters.epics.EpicsInterface`, it does not matter whether the
wrapped method is a part of the device or of the interface, this is handled automatically when
a new device is assigned to the ``device``-property.
In addition, the :meth:`handle_error`-method can be overridden. It is called when an exception
is raised while handling commands.
"""
protocol = "stream"
in_terminator = "\r"
out_terminator = "\r"
readtimeout = 100
commands = None
def __init__(self) -> None:
super(StreamInterface, self).__init__()
self.bound_commands = None
@property
def adapter(self):
return StreamAdapter
def _bind_device(self) -> None:
"""
This method implements ``_bind_device`` from :class:`~lewis.core.devices.InterfaceBase`.
It binds Cmd and Var definitions to implementations in Interface and Device.
"""
patterns = set()
self.bound_commands = []
for cmd in self.commands:
bound = cmd.bind(self) or cmd.bind(self.device) or None
if bound is None:
raise RuntimeError(
"Unable to produce callable object for non-existing member '{}' "
"of device or interface.".format(cmd.func)
)
for bound_cmd in bound:
pattern = bound_cmd.matcher.pattern
if pattern in patterns:
raise RuntimeError(
"The regular expression {} is " "associated with multiple commands.".format(
pattern
)
)
patterns.add(pattern)
self.bound_commands.append(bound_cmd)
[docs]
def handle_error(self, request, error) -> None:
"""
Override this method to handle exceptions that are raised during command processing.
The default implementation does nothing, so that any errors are silently ignored.
:param request: The request that resulted in the error.
:param error: The exception that was raised.
"""