Source code for serial_mock.mock

"""
fake_serial.Serial provides our interface to our fake serial (suprise!).

note that **data** is a special attribute and any keys passed into it will automatically have getters and setters provided for it
"""
import random
import threading
import traceback
from itertools import cycle

import serial
import re
import time

import sys
from cStringIO import StringIO


from serial_mock.decorators import QueryStore
from serial_mock.kb_listen import KBListen
import logging
logging.basicConfig(stream=sys.stdout,format="%(levelname)s:%(funcName)s %(lineno)d:  %(message)s")
logger = logging.getLogger("serial_mock")


class _StreamHelper(object):
    _exit = False
    @staticmethod
    def check_term(s, check_item):
        r"""
        >>> _StreamHelper.check_term("asdasdaQ","Q")
        True
        >>> _StreamHelper.check_term("asdasdaQ","\r")
        False
        >>> import re
        >>> _StreamHelper.check_term("asdasdaQ",re.compile("[a-z]([A-Z].*)"))
        True
        >>> _StreamHelper.check_term("asdasdaQ",re.compile("[0-9]([A-Z].*)"))
        False
        >>> _StreamHelper.check_term("asdasdaQ",['q','Q'])
        True
        >>> _StreamHelper.check_term("asdasdaQ",['q','Z'])
        False
        >>> try: _StreamHelper.check_term("asdasdaQ",True)
        ... except Exception as e: print e
        ...
        Unknown Terminal Condition:True
        
        :param s: the string so far to check 
        :param check_item: the condition
        :return: True or False depending on if the condition is met
        
        """
        if isinstance(check_item, basestring):
            return s.endswith(check_item)
        elif isinstance(check_item, re._pattern_type):
            return bool(check_item.search(s))
        elif isinstance(check_item, (list, tuple)):
            return any(_StreamHelper.check_term(s, itm) for itm in check_item)
        else:
            raise Exception("Unknown Terminal Condition:%r" % check_item)
    @staticmethod
    def read_until(stream, terminal_condition):
        r"""
        reads a stream until a terminal condition is met 
        
        >>> from cStringIO import StringIO
        >>> s = StringIO("Hello World\rBob")
        >>> _StreamHelper.read_until(s,"\r")
        'Hello World\r'
        >>> s.seek(0)
        >>> import re
        >>> _StreamHelper.read_until(s,re.compile("\s[A-C]"))
        'Hello World\rB'
        >>> s.seek(0)
        >>> _StreamHelper.read_until(s,re.compile("\s[C-Z]"))
        'Hello W'
        
        :param stream: 
        :param terminal_condition: 
        :return: 
        """
        s = ""
        _StreamHelper._exit=False
        while not _StreamHelper._exit:

            if s and _StreamHelper.check_term(s, terminal_condition):
                logger.debug("Response Complete(%s): %r (returning value)" % (getattr(stream, 'port', stream), s))
                return s
            elif s:
                logger.debug("Incomplete MSG(%s)(%r not found): %r (keep waiting)" % (getattr(stream, "port", stream), terminal_condition, s))
            if not hasattr(stream,"inWaiting") or stream.inWaiting():
                MockSerial._LOCK.acquire()
                try:
                    s += stream.read(1)
                finally:
                    MockSerial._LOCK.release()
            else:
                if MockSerial._hard_exit:
                    sys.exit(0)
                time.sleep(0.25)

[docs]class MockSerial(object): _hard_exit = False _LOCK = threading.Lock() #: any keys defined in **data** will automatically have getters or setters created for them data = {} #: the prefix to use with data auto generated routes data_prefix="-" #: the **baudrate** we should operate at baudrate=9600 #: the **prompt** to display to the user prompt=">" #: **user_terminal** defines the character(or characters, or regexp, or list) of items that indicate our user has finished a command delimiter="\r" #:**endline** defines the character to output after our response but before our prompt endline="\r" logfile = None #: any key:value pair in simple queries is exposed as a simple query response ... and the query string must be an exact match #: value can be a string/unicode/bytes value or it can be a list or array, if a list or array is passed in then the responses will be cycled #: any other value type will be coerced to `str` simple_queries = {} def __init__(self,stream,logfile=None,**kwargs): """ **MockSerial(stream:string)** instanciates a new MockStreamTunnel, stream should point to the comm port to listen on. *in general this class should not be directly invoked but should be subclassed, you can find some examples in the examples folder, or in the cli.py file* :param stream: a path to a pipe (ie "/dev/ttyS99","COM11"), a stream like object, or "DEBUG" :param data_prefix: the separator between getters/setters and the data_attribute they reference >>> from serial_mock.decorators import serial_query >>> from serial_mock.mock import MockSerial >>> class SimpleSerial(MockSerial): ... simple_queries = { ... "get -name":"hello my name is bob", ... "get -next":["123","456","789"], ... "get -id":12 ... } ... data={"x":6} ... @serial_query("trigger command") ... def do_something(self,requiredArg,optionalArg="0"): ... return "RESULT: %r %r"%(requiredArg,optionalArg) ... >>> mock = SimpleSerial("DEBUG") >>> mock.process_cmd("trigger command 1") "RESULT: '1' '0'" >>> mock.process_cmd("trigger command 1 2") "RESULT: '1' '2'" >>> mock.process_cmd("get -name") 'hello my name is bob' >>> mock.process_cmd("get -next") '123' >>> mock.process_cmd("get -next") '456' >>> mock.process_cmd("get -next") '789' >>> mock.process_cmd("get -next") '123' >>> mock.process_cmd("get -id") '12' >>> mock.process_cmd("get -x") '6' >>> mock.process_cmd('set -x 10') 'OK' >>> mock.process_cmd("get -x") '10' """ self.running = False self.kb = None QueryStore.target = self super(MockSerial, self).__init__() for key in "data_prefix baudrate prompt delimiter endline".split(): if key in kwargs: setattr(self,key,kwargs.pop(key)) self.stream = stream if logfile: if isinstance(logfile,basestring): logfile = open(logfile,"wb") self.logfile = logfile if isinstance(stream,basestring) and not stream == "DEBUG": try: self.stream = serial.Serial(stream,self.baudrate) except: raise Exception("Unable To Bind To %r"%stream) for k in self.data: QueryStore.register(lambda self,k=k:str(self.data.get(k,"None")),"get %s%s"%(self.data_prefix,k)) QueryStore.register(lambda self,value,k=k:self.data.update({k:value}) or "OK","set %s%s"%(self.data_prefix,k)) self._simple_queries = {} for k,v in self.simple_queries.items(): if isinstance(v,basestring): self._simple_queries[k] = cycle([v,]) elif isinstance(v,(list,tuple)): self._simple_queries[k] = cycle(v) else: self._simple_queries[k] = cycle([str(v), ]) if self.stream is "DEBUG": logger.warn("Running in debug mode you may not run MainLoop!") else: assert hasattr(self.stream,"read") and hasattr(self.stream,"write"),"STREAM must provide a minimum of read and write" @staticmethod def _read_from_stream(stream,terminal): return _StreamHelper.read_until(stream, terminal)
[docs] def process_cmd(self, cmd): """ looks up a command to see if its registered. and returns the result if it is otherwise returns an error string in general this command should not be invoked directly (but it can be...) >>> from serial_mock.mock import MockSerial >>> inst = MockSerial("DEBUG") >>> inst.process_cmd("a") "ERROR 'a' Not Found" :param cmd: the command to process :return: a string (the result of the command) """ cmd = re.sub(".\x08","",cmd.strip()) if self.logfile: self.logfile.write("<%r\n"%(cmd,)) if not cmd: return "" if cmd in self.simple_queries: result = next(self._simple_queries[cmd]) logger.debug("Simple Query Response:%r -> %r"%(cmd,result)) return result try: method,rest = QueryStore._find(cmd) except KeyError: traceback.print_exc() return "ERROR %r Not Found"%cmd try: logger.debug("calling function: %r"%method.__name__) result = method(self,*rest) logger.debug("%s returns: %r"%(method.__name__,result)) return result except Exception as e: traceback.print_exc() return "ERROR %r : %s"%(cmd,e)
def _process_keydown(self,key): result = QueryStore._find_key_binding(key) if not result: return result(self) def _write_to_stream(self,response): if not response:return if self.logfile: self.logfile.write(">%r\n"%(response,)) self._LOCK.acquire() try: self.stream.write("%s%s"%(response,self.endline)) except: if self.stream == "DEBUG": print("%s%s"%(response,self.endline)) else: raise finally: self._LOCK.release()
[docs] def terminate(self): """ stop the MainLoop if running :return: """ self.running = False try: self.stream.close() except: logger.warn("unable to close stream...skipping") if self.kb: self.kb.halt = True _StreamHelper._exit = True logger.debug("Terminate Flags set!")
[docs] def MainLoop(self): """ Mainloop will run forever serving the rules provided in the subclass to the bound pipe """ assert self.stream != "DEBUG" self.running = True if QueryStore.__keybinds__: self.kb = KBListen(self._process_keydown) self.kb.Listen() print "LISTENING ON:",self.stream while self.running: self.stream.write(self.prompt) try: cmd = self.process_cmd(self._read_from_stream(self.stream, self.delimiter)) except: logger.info("Leaving MainLoop") return self.terminate() self._write_to_stream(cmd) self.terminate() logger.info("Leaving MainLoop")
[docs]class DummySerial(serial.Serial): r""" DummySerial provides a serial.Serial interface into a MockSerial instance. you can use this as a dropin replacement to serial.Serial, for anything that accepts serial.Serial as an argument >>> from serial_mock.mock import DummySerial,MockSerial >>> from serial_mock.decorators import serial_query >>> class MyInterface(MockSerial): ... @serial_query("trigger command") ... def do_something(self,requiredArg,optionalArg="0"): ... return "RESULT: %r %r"%(requiredArg,optionalArg) ... >>> ser = DummySerial(MyInterface) >>> ser.write("trigger command 5\r") 18L >>> ser.read(ser.inWaiting()) "RESULT: '5' '0'\r>" >>> ser.write("trigger command 1 2\r") 20L >>> ser.read(ser.inWaiting()) "RESULT: '1' '2'\r>" """ is_open = True _port_handle = None _baudrate = "ANY" _bytesize = "ANY" _parity = "ANY" _stopbits = 1 _timeout = None _xonxoff = None _rtscts = None _dsrdtr = None def __init__(self,MockSerialClass): self.myMock = MockSerialClass(StringIO()) self.rx_buffer = "" self.tx_buffer = "" self.port = "MOC1" self.is_open = True
[docs] def open(self): return True
[docs] def close(self): return True
@property def in_waiting(self): return self.inWaiting()
[docs] def inWaiting(self): return len(self.tx_buffer)
[docs] def write(self,msg): self.rx_buffer += msg if _StreamHelper.check_term(self.rx_buffer, self.rx_buffer): self.myMock._write_to_stream(self.myMock.process_cmd(self.rx_buffer)) self.myMock.stream.seek(0) self.tx_buffer += self.myMock.stream.read() + self.myMock.prompt self.myMock.stream.truncate(0) self.rx_buffer = "" return long(len(msg))
[docs] def read(self,bytes=1): resp,self.tx_buffer =self.tx_buffer[:bytes],self.tx_buffer[bytes:] return resp
[docs]class EmittingSerial(MockSerial): emit = "EMIT MSG" delay = 5,35 interval = 15,35 def __init__(self, stream, logfile=None, **kwargs): """ **EmmitingSerial(stream:string)** provides a reference class on an interface that periodically emits a "heartbeat" type message :param stream: a path to a pipe (ie "/dev/ttyS99","COM11"), a stream like object, or "DEBUG" :param data_prefix: the separator between getters/setters and the data_attribute they reference """ super(EmittingSerial, self).__init__(stream, logfile, **kwargs) def _on_start_emit(self): self.emit_timer = threading.Timer(random.uniform(*self.interval),self._on_emit) self.emit_timer.start() def _on_emit(self): if self.running: self._write_to_stream(self.emit) self._on_start_emit()
[docs] def MainLoop(self): self.emit_timer = threading.Timer(random.uniform(*self.delay), self._on_start_emit) self.emit_timer.start() MockSerial.MainLoop(self) try: # self.timer might have already fired and not been re-started self.emit_timer.cancel() except: pass
if __name__ == "__main__": class TestClass(MockSerial): @QueryStore("hello") def say_hello(self,name="BOB"): return "Hello, %s"%name d = DummySerial(TestClass) print isinstance(d,serial.Serial),d print "sent:",repr(d.write("hello joey\r")) print "RECV:",repr(d.read(d.inWaiting()))