Changeset 88

Show
Ignore:
Timestamp:
03/18/08 23:56:20 (10 months ago)
Author:
jkyllo
Message:

Did some conversion from Input/Output to Sink/Source terminology.
Converted many tabs to spaces. Hopefully all.
Added TimedSource? and beginnings of ConsoleInputSource?.
Added Graph class.
Added use of locks in NotificationCenter?.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • branches/spike-20070613 tubes/tubes.py

    r86 r88  
     1# vim:ts=4:sw=4:expandtab:smarttab 
     2 
     3 
    14"""Tubes is a library for moving data between nodes where each node can process 
    25those data before passing them onto other nodes.  Conceptually a graph is 
     
    1215for specifying constants. 
    1316""" 
     17 
     18 
     19import threading 
     20import time 
     21import sys 
     22 
     23 
     24# An output posts a notification when it has data ready.  The "read" method on 
     25# the output can be called to retrieve the data.  At that point the data is no 
     26# longer available on the output.  This is a one-time deal.  The way this works 
     27# in the system is that the Graph object itself (or perhaps an Edge object...) 
     28# listens for the notification.  When it is received, it keeps a copy for each 
     29# Edge/Sink that is associated with the Source.  Then, each Sink receives a 
     30# notification that there is input data ready.  The Sink may then call a 
     31# function on the edge that reads the data.  In some ways the edges are simply 
     32# acting as buffer proxies for the Sources. 
     33 
     34 
     35def curry(_curried_func, *args, **kwargs): 
     36    def _curried(*moreargs, **morekwargs): 
     37        return _curried_func(*(args+moreargs), **dict(kwargs, **morekwargs)) 
     38    return _curried 
     39 
     40 
     41 
     42 
     43class Graph(object): 
     44    """A directed, non-cyclical graph """ 
     45    def __init__(self): 
     46        self.nodes = [] 
     47        self.edges = {} 
     48        self.nc = NotificationCenter() 
     49 
     50    def load(self, filename): 
     51        pass 
     52 
     53    def save(self, filename): 
     54        pass 
     55 
     56    def addNode(self, node): 
     57        self.nodes.append(node) 
     58 
     59    def removeNode(self, node): 
     60        self.nodes.remove(node) 
     61 
     62    def connect(self, source, sink): 
     63        self.nc.addListener(source, 'DATA_READY', curry(self.sourceDataReady, sink=sink)) 
     64        self.nc.addListener(sink, 'DATA_REQUEST', curry(self.sinkDataRequest, source=source)) 
     65        if source not in self.edges: 
     66            self.edges[source] = [] 
     67        self.edges[source].append(sink) 
     68 
     69        self.nc.post(source, 'CONNECT') 
     70        self.nc.post(sink, 'CONNECT') 
     71 
     72    def disconnect(self, source, sink): 
     73        self.nc.removeListener(source, 'DATA_READY', self.sourceDataReady) 
     74        self.nc.removeListener(sink, 'DATA_REQUEST', self.sinkDataRequest) 
     75        if source in self.edges: 
     76            self.edges[source].remove(sink) 
     77 
     78        self.nc.post(source, 'DISCONNECT') 
     79        self.nc.post(sink, 'DISCONNECT') 
     80 
     81    def sourceDataReady(self, source, signal, sink): 
     82        sink.write(source.read()) 
     83 
     84    def sinkDataRequest(self, sink, signal, source): 
     85        sink.write(source.read()) 
    1486 
    1587class Node(object): 
     
    2496    they can be tied to the outputs of other Nodes. 
    2597    """ 
    26     inputs = None 
    27     outputs = None 
    28  
    29     def __init__(self): 
    30        """Initialize a new Node.""" 
    31        self.inputs = [] 
    32        self.outputs = [] 
    33  
    34     def getInputs(self): 
    35        """Return a sequence of inputs available on this Node.""" 
    36        return self.input
    37  
    38     def getOutputs(self): 
    39        """Return a sequence of outputs available on this Node.""" 
    40        return self.output
     98    sinks = None 
     99    sources = None 
     100 
     101    def __init__(self): 
     102        """Initialize a new Node.""" 
     103        self.sinks = [] 
     104        self.sources = [] 
     105 
     106    def getSinks(self): 
     107        """Return a sequence of inputs available on this Node.""" 
     108        return self.sink
     109 
     110    def getSources(self): 
     111        """Return a sequence of outputs available on this Node.""" 
     112        return self.source
    41113 
    42114class NodeInputOutput(object): 
    43     """NodeInputOutput acts as an Input or an Output for a Node.  It buffers data that 
     115    """NodeInputOutput acts as an Sink or an Source for a Node.  It buffers data that 
    44116    is written to it so that it may later be read (in order).  It doesn't care 
    45     whether it is used as an Input or as an Output and really could be used as 
     117    whether it is used as an Sink or as an Source and really could be used as 
    46118    either.""" 
    47     def __init__(self): 
    48         self.buffer = [] 
    49         self.nc = NotificationCenter() 
    50         self.source = None 
     119 
     120    def __init__(self): 
     121        self.buffer = [] 
     122        self.nc = NotificationCenter() 
     123        self.source = None 
    51124 
    52125    def connect(self, output): 
    53        self.source = output 
    54        self.nc.addListener(output, 'DATA_READY', self.processInput) 
     126        self.source = output 
     127        self.nc.addListener(output, 'DATA_READY', self.processInput) 
    55128 
    56129    def disconnect(self): 
    57        self.nc.removeListener(output, 'DATA_READY', self.processInput) 
    58        self.source = None 
     130        self.nc.removeListener(output, 'DATA_READY', self.processInput) 
     131        self.source = None 
    59132 
    60133    def processInput(self, node_output, signal): 
    61        if signal == 'DATA_READY': 
    62            self.write(node_output.read()) 
     134        if signal == 'DATA_READY': 
     135            self.write(node_output.read()) 
    63136 
    64137 
     
    67140    ####  Node Output 
    68141    def readyData(self): 
    69        return len(self.buffer) 
     142        return len(self.buffer) 
    70143 
    71144    def read(self, count=1): 
    72        #print '>>>>>>> read(), self.buffer=%s' % (self.buffer,) 
    73  
    74        if count == 0: 
    75            count = len(self.buffer) 
    76  
    77        ret = self.buffer[0:count] 
    78        #print 'ret = %s' % (ret,) 
    79        del self.buffer[0:count] 
    80        #print 'ret = %s' % (ret,) 
    81        return ret 
    82          
     145        #print '>>>>>>> read(), self.buffer=%s' % (self.buffer,) 
     146 
     147        if count == 0: 
     148            count = len(self.buffer) 
     149 
     150        ret = self.buffer[0:count] 
     151        #print 'ret = %s' % (ret,) 
     152        del self.buffer[0:count] 
     153        #print 'ret = %s' % (ret,) 
     154        return ret 
     155     
    83156    def readSample(self): 
    84        return self.read()[0] 
     157        return self.read()[0] 
    85158 
    86159 
    87160    ####  Node Input 
    88161    def write(self, data): 
    89        self.buffer += data 
    90        #print 'posting DATA_READY' 
    91        self.nc.post(self, 'DATA_READY') 
     162        self.buffer += data 
     163        #print 'posting DATA_READY' 
     164        self.nc.post(self, 'DATA_READY') 
    92165 
    93166    def writeSample(self, data): 
    94        self.write((data,)) 
    95  
    96 class Input(object): 
    97     """An Input on a Node that receives incoming data. 
     167        self.write((data,)) 
     168 
     169class Sink(object): 
     170    """An Sink on a Node that receives incoming data. 
    98171     
    99172    Signals: 
    100        DataRequest - This input is ready to read data from the connected output. 
     173    DATA_REQUEST - This input is ready to read data from the connected output. 
    101174    """ 
    102175    def __init__(self): 
    103         self.source = None 
    104         self.nc = NotificationCenter() 
    105  
    106     def connect(self, output): 
    107         """Associate this Input with the given Output so that the Input can pull data from the Output when necessary.""" 
    108         self.source = output 
    109         self.nc.post(self, 'CONNECT') 
    110  
    111     def disconnect(self): 
    112         self.nc.post(self, 'DISCONNECT') 
    113         self.source = None 
     176        pass 
    114177 
    115178    def write(self, data): 
    116        raise NotImplemented() 
    117  
    118 class Output(object): 
    119     """An Output on a Node that sends data to Inputs. 
    120      
     179        raise NotImplemented() 
     180 
     181class Source(object): 
     182    """An Source on a Node that sends data to Sinks. 
     183    less you call and do an e-check kinda thing 
    121184    Signals: 
    122        DataRead - Data is ready to be read from this output. 
     185    DATA_READY - Data is ready to be read from this output. 
    123186    """ 
    124 #    signalListeners = [] 
    125  
    126     def __init__(self): 
    127         """Initialize a new Output.""" 
    128 #       self.signalListeners = {} 
     187 
     188    def __init__(self): 
     189        """Initialize a new Source.""" 
     190        pass 
    129191 
    130192    def read(self): 
    131         """Read data from this Output if available.  If none is available then return None.  i.e. this is non-blocking.""" 
    132         raise NotImplemented() 
     193        """Read data from this Source if available.  If none is available then return None.  i.e. this is non-blocking.""" 
     194        raise NotImplemented() 
     195 
     196    def numSamplesReady(self): 
     197        raise NotImplemented() 
    133198 
    134199#    def addListener(self, listener, signal): 
    135 #      self.signalListeners[signal].append(listener) 
     200#   self.signalListeners[signal].append(listener) 
    136201# 
    137202#    def tripSignal(self, signal): 
    138 #      if self.signalListeners.hasKey(signal): 
    139 #          listeners = self.signalListeners[signal] 
    140 #          for listener in listeners: 
    141 #              listener(self, signal) 
     203#   if self.signalListeners.hasKey(signal): 
     204#       listeners = self.signalListeners[signal] 
     205#       for listener in listeners: 
     206#       listener(self, signal) 
    142207 
    143208 
     
    168233 
    169234    def __new__(cls, *args, **kwargs): 
    170        """Overrides default object creation to ensure that only one instance 
    171        per class is ever returned.  These instances are stored in a dictionary 
    172        on the Singleton class that is keyed on the type of the object being 
    173        created.""" 
    174        if not Singleton.__singletons.has_key(cls): 
    175            Singleton.__singletons[cls] = None 
    176  
    177        if Singleton.__singletons[cls] is None: 
    178            Singleton.__singletons[cls] = object.__new__(cls, *args, **kwargs) 
    179  
    180        return Singleton.__singletons[cls] 
     235        """Overrides default object creation to ensure that only one instance 
     236        per class is ever returned.  These instances are stored in a dictionary 
     237        on the Singleton class that is keyed on the type of the object being 
     238        created.""" 
     239        if not Singleton.__singletons.has_key(cls): 
     240            Singleton.__singletons[cls] = None 
     241 
     242        if Singleton.__singletons[cls] is None: 
     243            Singleton.__singletons[cls] = object.__new__(cls, *args, **kwargs) 
     244 
     245        return Singleton.__singletons[cls] 
    181246 
    182247 
    183248class NotificationCenter(Singleton): 
    184249    def __init__(self): 
    185         self.listeners = {} 
     250        self.listeners = {} 
     251        self.lock = threading.RLock() 
    186252 
    187253    def _ensureListenerList(self, source, signal): 
    188         if not self.listeners.has_key((source, signal)): 
    189             self.listeners[(source, signal)] = [] 
     254        if not self.listeners.has_key((source, signal)): 
     255            print "NotificationCenter: creating listener list for (source, signal) = (%s, %s)" % (source, signal) 
     256            self.listeners[(source, signal)] = [] 
    190257 
    191258    def addListener(self, source, signal, callable): 
    192         #print 'adding listener: source=%s, signal=%s, callable=%s' % (source, signal, callable) 
    193         self._ensureListenerList(source, signal) 
    194         self.listeners[(source, signal)].append(callable) 
     259        self.lock.acquire() 
     260 
     261        #print 'adding listener: source=%s, signal=%s, callable=%s' % (source, signal, callable) 
     262        self._ensureListenerList(source, signal) 
     263        print "NotificationCenter.addListener(%s, %s, %s)" % (source, signal, callable) 
     264        self.listeners[(source, signal)].append(callable) 
     265 
     266        self.lock.release() 
    195267 
    196268    def removeListener(self, source, signal, func): 
    197         key = (source, signal) 
    198         if self.listeners.has_key(key): 
    199             f = lambda x: x is not func 
    200             self.listeners[key] = filter(f, self.listeners[key]) 
     269        self.lock.acquire() 
     270 
     271        key = (source, signal) 
     272        if self.listeners.has_key(key): 
     273            f = lambda x: x is not func 
     274            self.listeners[key] = filter(f, self.listeners[key]) 
     275 
     276        self.lock.release() 
    201277 
    202278    def post(self, source, signal): 
    203         #print 'signal posted: (%s, %s)' % (source, signal) 
    204         key = (source, signal) 
    205         if self.listeners.has_key(key): 
    206             for f in self.listeners[key]: 
    207                 #print '\tnotifying %s' % (f,) 
    208                 f(source, signal) 
    209  
     279        self.lock.acquire() 
     280        print "NotificationCenter.post()" 
     281 
     282        #print 'signal posted: (%s, %s)' % (source, signal) 
     283        key = (source, signal) 
     284        if self.listeners.has_key(key): 
     285            print "\tlisteners for (source, signal) = (%s, %s)" % (source, signal) 
     286            for f in self.listeners[key]: 
     287                print '\t\tnotifying %s' % (f,) 
     288                f(source, signal) 
     289        else: 
     290            print "\tno listener for (source, signal) = (%s, %s)" % (source,signal) 
     291 
     292        self.lock.release() 
    210293 
    211294class PassthroughNode(Node): 
    212295    def __init__(self): 
    213         Node.__init__(self) 
    214         i = NodeInputOutput() 
    215         self.inputs.append(i) 
    216         self.outputs.append(i) 
     296        Node.__init__(self) 
     297        i = NodeInputOutput() 
     298        self.inputs.append(i) 
     299        self.outputs.append(i) 
     300 
     301class TimedNode(Node): 
     302    class TimedSource(Source): 
     303        def __init__(self, timeout=2): 
     304            self.timeout = timeout 
     305            self.buffer = [] 
     306            self.buflock = threading.Lock() 
     307            self.counter = 0 
     308            self.nc = NotificationCenter() 
     309 
     310            self.startTimer() 
     311 
     312        def startTimer(self): 
     313            self.timer = threading.Timer(2, self.handleTimeout) 
     314            self.timer.start() 
     315 
     316        def handleTimeout(self): 
     317            print "handleTimeout" 
     318            # This function is called from another thread (Timer) so use a lock... 
     319            print "\thandleTimeout: acquiring lock" 
     320            self.buflock.acquire() 
     321            print "\thandleTimeout: lock acquired, inserting data" 
     322 
     323            self.counter += 1 
     324            self.buffer.insert(0, '<EVENT: %d>' % self.counter) 
     325 
     326            print "\thandleTimeout: releasing lock" 
     327            self.buflock.release() 
     328 
     329            self.startTimer() 
     330             
     331            self.nc.post(self, 'DATA_READY') 
     332 
     333        def read(self): 
     334            print "read" 
     335            # The buffer might be locked for insertion, so grab the lock first, then pop the earliest sample 
     336            print "\tread: acquiring lock" 
     337            self.buflock.acquire() 
     338            print "\tread: lock acquired, popping data" 
     339 
     340            ret = self.buffer.pop() 
     341 
     342            print "\tread: releasing lock" 
     343            self.buflock.release() 
     344 
     345            return ret 
     346 
     347 
     348    def __init__(self): 
     349        Node.__init__(self) 
     350        self.nc = NotificationCenter() 
     351        self.source = TimedNode.TimedSource() 
     352        self.sources = [self.source] 
     353 
     354        self.nc.addListener(self.source, 'CONNECT', self.handleConnections) 
     355        self.nc.addListener(self.source, 'DISCONNECT', self.handleConnections) 
     356 
     357    def handleData(self): 
     358        self.nc.post(self.source, 'DATA_READY') 
     359 
     360    def handleConnections(self, source, signal): 
     361        sink = source.sink 
     362 
     363        if signal == 'CONNECT': 
     364            self.nc.addListener(sink, 'DATA_REQUEST', self.handleDataRequest) 
     365        elif signal == 'DISCONNECT': 
     366            self.nc.removeListener(sink, 'DATA_REQUEST', self.handleDataRequest) 
     367 
     368    def handleDataRequest(self, sink, signal): 
     369        pass 
     370 
     371 
     372class ConsoleInputNode(Node): 
     373    class ConsoleMon(threading.Thread, Source): 
     374        def __init__(self, update_func, delay=0.01, readlen=20, group=None, target=None, name=None): 
     375            threading.Thread.__init__(self, group, target, name) 
     376            self.delay = delay 
     377            self.readlen = readlen 
     378            self.buffer = "" 
     379 
     380        def run(self): 
     381            time.sleep(self.delay) 
     382            bytes = sys.stdin.read(self.readlen) 
     383            self.buffer += bytes 
     384 
     385        def numSamplesReady(self): 
     386            return len(self.buffer) 
     387 
     388        def read(self): 
     389            l = len(self.buffer) 
     390            ret = self.buffer[0:l] 
     391            del self.buffer[0:l] 
     392            return ret 
     393 
     394    def __init__(self): 
     395        Node.__init__(self) 
     396        self.nc = NotificationCenter() 
     397        self.source = ConsoleInputNode.ConsoleMon(self.handleData) 
     398        self.inputs.append(self.source) 
     399 
     400        self.nc.addListener(self.source, 'CONNECT', self.handleConnections) 
     401        self.nc.addListener(self.source, 'DISCONNECT', self.handleConnections) 
     402 
     403        self.source.start() 
     404 
     405    def handleConnections(self, source, signal): 
     406        sink = source.sink 
     407 
     408        if signal == 'CONNECT': 
     409            self.nc.addListener(sink, 'DATA_REQUEST', self.handleDataRequest) 
     410        elif signal == 'DISCONNECT': 
     411            self.nc.removeListener(sink, 'DATA_REQUEST', self.handleDataRequest) 
     412 
     413    def handleDataRequest(self, sink, signal): 
     414        pass 
     415 
    217416 
    218417class ConsolePrintNode(Node): 
    219418    def __init__(self): 
    220        Node.__init__(self) 
    221        self.nc = NotificationCenter() 
    222        i = Input() 
    223        self.nc.addListener(i, 'CONNECT', self.handleConnections) 
    224        self.nc.addListener(i, 'DISCONNECT', self.handleConnections) 
    225        self.inputs.append(i) 
     419        Node.__init__(self) 
     420        self.nc = NotificationCenter() 
     421        i = Sink() 
     422        self.nc.addListener(i, 'CONNECT', self.handleConnections) 
     423        self.nc.addListener(i, 'DISCONNECT', self.handleConnections) 
     424        self.sinks.append(i) 
    226425 
    227426    def handleConnections(self, node_input, signal): 
    228        output = node_input.source 
    229  
    230        if signal == 'CONNECT': 
    231            self.nc.addListener(output, 'DATA_READY', self.handleInput) 
    232        elif signal == 'DISCONNECT': 
    233            self.nc.removeListener(output, 'DATA_READY', self.handleInput) 
     427        output = node_input.source 
     428 
     429        if signal == 'CONNECT': 
     430            self.nc.addListener(output, 'DATA_READY', self.handleInput) 
     431        elif signal == 'DISCONNECT': 
     432            self.nc.removeListener(output, 'DATA_READY', self.handleInput) 
    234433 
    235434    def handleInput(self, node_output, signal): 
    236        #print '>>>> handleInput (node_output=%s, signal=%s)' % (node_output, signal) 
    237        if signal == 'DATA_READY': 
    238            data = node_output.read() 
    239            #print '\t\tdata=%s' % (data,) 
    240            for datum in data: 
    241                print "read: %s" % (datum,) 
    242  
    243 class FuncOutput(NodeInputOutput): 
     435        #print '>>>> handleInput (node_output=%s, signal=%s)' % (node_output, signal) 
     436        if signal == 'DATA_READY': 
     437            data = node_output.read() 
     438            #print '\t\tdata=%s' % (data,) 
     439            for datum in data: 
     440                print "read: %s" % (datum,) 
     441 
     442class FuncSource(NodeInputOutput): 
    244443    def getFunc(self): 
    245        def f(*args, **kwargs): 
    246            self.writeSample((args, kwargs)) 
    247  
    248        return f 
     444        def f(*args, **kwargs): 
     445            self.writeSample((args, kwargs)) 
     446 
     447        return f