Changeset 88
- Timestamp:
- 03/18/08 23:56:20 (10 months ago)
- Files:
-
- branches/spike-20070613 tubes/tubes.py (modified) (5 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
branches/spike-20070613 tubes/tubes.py
r86 r88 1 # vim:ts=4:sw=4:expandtab:smarttab 2 3 1 4 """Tubes is a library for moving data between nodes where each node can process 2 5 those data before passing them onto other nodes. Conceptually a graph is … … 12 15 for specifying constants. 13 16 """ 17 18 19 import threading 20 import time 21 import sys 22 23 24 # An output posts a notification when it has data ready. The "read" method on 25 # the output can be called to retrieve the data. At that point the data is no 26 # longer available on the output. This is a one-time deal. The way this works 27 # in the system is that the Graph object itself (or perhaps an Edge object...) 28 # listens for the notification. When it is received, it keeps a copy for each 29 # Edge/Sink that is associated with the Source. Then, each Sink receives a 30 # notification that there is input data ready. The Sink may then call a 31 # function on the edge that reads the data. In some ways the edges are simply 32 # acting as buffer proxies for the Sources. 33 34 35 def curry(_curried_func, *args, **kwargs): 36 def _curried(*moreargs, **morekwargs): 37 return _curried_func(*(args+moreargs), **dict(kwargs, **morekwargs)) 38 return _curried 39 40 41 42 43 class Graph(object): 44 """A directed, non-cyclical graph """ 45 def __init__(self): 46 self.nodes = [] 47 self.edges = {} 48 self.nc = NotificationCenter() 49 50 def load(self, filename): 51 pass 52 53 def save(self, filename): 54 pass 55 56 def addNode(self, node): 57 self.nodes.append(node) 58 59 def removeNode(self, node): 60 self.nodes.remove(node) 61 62 def connect(self, source, sink): 63 self.nc.addListener(source, 'DATA_READY', curry(self.sourceDataReady, sink=sink)) 64 self.nc.addListener(sink, 'DATA_REQUEST', curry(self.sinkDataRequest, source=source)) 65 if source not in self.edges: 66 self.edges[source] = [] 67 self.edges[source].append(sink) 68 69 self.nc.post(source, 'CONNECT') 70 self.nc.post(sink, 'CONNECT') 71 72 def disconnect(self, source, sink): 73 self.nc.removeListener(source, 'DATA_READY', self.sourceDataReady) 74 self.nc.removeListener(sink, 'DATA_REQUEST', self.sinkDataRequest) 75 if source in self.edges: 76 self.edges[source].remove(sink) 77 78 self.nc.post(source, 'DISCONNECT') 79 self.nc.post(sink, 'DISCONNECT') 80 81 def sourceDataReady(self, source, signal, sink): 82 sink.write(source.read()) 83 84 def sinkDataRequest(self, sink, signal, source): 85 sink.write(source.read()) 14 86 15 87 class Node(object): … … 24 96 they can be tied to the outputs of other Nodes. 25 97 """ 26 inputs = None27 outputs = None28 29 def __init__(self): 30 """Initialize a new Node."""31 self.inputs = []32 self.outputs = []33 34 def get Inputs(self):35 """Return a sequence of inputs available on this Node."""36 return self.inputs37 38 def get Outputs(self):39 """Return a sequence of outputs available on this Node."""40 return self.outputs98 sinks = None 99 sources = None 100 101 def __init__(self): 102 """Initialize a new Node.""" 103 self.sinks = [] 104 self.sources = [] 105 106 def getSinks(self): 107 """Return a sequence of inputs available on this Node.""" 108 return self.sinks 109 110 def getSources(self): 111 """Return a sequence of outputs available on this Node.""" 112 return self.sources 41 113 42 114 class NodeInputOutput(object): 43 """NodeInputOutput acts as an Input or an Outputfor a Node. It buffers data that115 """NodeInputOutput acts as an Sink or an Source for a Node. It buffers data that 44 116 is written to it so that it may later be read (in order). It doesn't care 45 whether it is used as an Input or as an Outputand really could be used as117 whether it is used as an Sink or as an Source and really could be used as 46 118 either.""" 47 def __init__(self): 48 self.buffer = [] 49 self.nc = NotificationCenter() 50 self.source = None 119 120 def __init__(self): 121 self.buffer = [] 122 self.nc = NotificationCenter() 123 self.source = None 51 124 52 125 def connect(self, output): 53 self.source = output54 self.nc.addListener(output, 'DATA_READY', self.processInput)126 self.source = output 127 self.nc.addListener(output, 'DATA_READY', self.processInput) 55 128 56 129 def disconnect(self): 57 self.nc.removeListener(output, 'DATA_READY', self.processInput)58 self.source = None130 self.nc.removeListener(output, 'DATA_READY', self.processInput) 131 self.source = None 59 132 60 133 def processInput(self, node_output, signal): 61 if signal == 'DATA_READY':62 self.write(node_output.read())134 if signal == 'DATA_READY': 135 self.write(node_output.read()) 63 136 64 137 … … 67 140 #### Node Output 68 141 def readyData(self): 69 return len(self.buffer)142 return len(self.buffer) 70 143 71 144 def read(self, count=1): 72 #print '>>>>>>> read(), self.buffer=%s' % (self.buffer,)73 74 if count == 0:75 count = len(self.buffer)76 77 ret = self.buffer[0:count]78 #print 'ret = %s' % (ret,)79 del self.buffer[0:count]80 #print 'ret = %s' % (ret,)81 return ret82 145 #print '>>>>>>> read(), self.buffer=%s' % (self.buffer,) 146 147 if count == 0: 148 count = len(self.buffer) 149 150 ret = self.buffer[0:count] 151 #print 'ret = %s' % (ret,) 152 del self.buffer[0:count] 153 #print 'ret = %s' % (ret,) 154 return ret 155 83 156 def readSample(self): 84 return self.read()[0]157 return self.read()[0] 85 158 86 159 87 160 #### Node Input 88 161 def write(self, data): 89 self.buffer += data90 #print 'posting DATA_READY'91 self.nc.post(self, 'DATA_READY')162 self.buffer += data 163 #print 'posting DATA_READY' 164 self.nc.post(self, 'DATA_READY') 92 165 93 166 def writeSample(self, data): 94 self.write((data,))95 96 class Input(object):97 """An Inputon a Node that receives incoming data.167 self.write((data,)) 168 169 class Sink(object): 170 """An Sink on a Node that receives incoming data. 98 171 99 172 Signals: 100 DataRequest- This input is ready to read data from the connected output.173 DATA_REQUEST - This input is ready to read data from the connected output. 101 174 """ 102 175 def __init__(self): 103 self.source = None 104 self.nc = NotificationCenter() 105 106 def connect(self, output): 107 """Associate this Input with the given Output so that the Input can pull data from the Output when necessary.""" 108 self.source = output 109 self.nc.post(self, 'CONNECT') 110 111 def disconnect(self): 112 self.nc.post(self, 'DISCONNECT') 113 self.source = None 176 pass 114 177 115 178 def write(self, data): 116 raise NotImplemented()117 118 class Output(object):119 """An Output on a Node that sends data to Inputs.120 179 raise NotImplemented() 180 181 class Source(object): 182 """An Source on a Node that sends data to Sinks. 183 less you call and do an e-check kinda thing 121 184 Signals: 122 DataRead- Data is ready to be read from this output.185 DATA_READY - Data is ready to be read from this output. 123 186 """ 124 # signalListeners = [] 125 126 def __init__(self): 127 """Initialize a new Output.""" 128 # self.signalListeners = {} 187 188 def __init__(self): 189 """Initialize a new Source.""" 190 pass 129 191 130 192 def read(self): 131 """Read data from this Output if available. If none is available then return None. i.e. this is non-blocking.""" 132 raise NotImplemented() 193 """Read data from this Source if available. If none is available then return None. i.e. this is non-blocking.""" 194 raise NotImplemented() 195 196 def numSamplesReady(self): 197 raise NotImplemented() 133 198 134 199 # def addListener(self, listener, signal): 135 # self.signalListeners[signal].append(listener)200 # self.signalListeners[signal].append(listener) 136 201 # 137 202 # def tripSignal(self, signal): 138 # if self.signalListeners.hasKey(signal):139 # listeners = self.signalListeners[signal]140 # for listener in listeners:141 # listener(self, signal)203 # if self.signalListeners.hasKey(signal): 204 # listeners = self.signalListeners[signal] 205 # for listener in listeners: 206 # listener(self, signal) 142 207 143 208 … … 168 233 169 234 def __new__(cls, *args, **kwargs): 170 """Overrides default object creation to ensure that only one instance171 per class is ever returned. These instances are stored in a dictionary172 on the Singleton class that is keyed on the type of the object being173 created."""174 if not Singleton.__singletons.has_key(cls):175 Singleton.__singletons[cls] = None176 177 if Singleton.__singletons[cls] is None:178 Singleton.__singletons[cls] = object.__new__(cls, *args, **kwargs)179 180 return Singleton.__singletons[cls]235 """Overrides default object creation to ensure that only one instance 236 per class is ever returned. These instances are stored in a dictionary 237 on the Singleton class that is keyed on the type of the object being 238 created.""" 239 if not Singleton.__singletons.has_key(cls): 240 Singleton.__singletons[cls] = None 241 242 if Singleton.__singletons[cls] is None: 243 Singleton.__singletons[cls] = object.__new__(cls, *args, **kwargs) 244 245 return Singleton.__singletons[cls] 181 246 182 247 183 248 class NotificationCenter(Singleton): 184 249 def __init__(self): 185 self.listeners = {} 250 self.listeners = {} 251 self.lock = threading.RLock() 186 252 187 253 def _ensureListenerList(self, source, signal): 188 if not self.listeners.has_key((source, signal)): 189 self.listeners[(source, signal)] = [] 254 if not self.listeners.has_key((source, signal)): 255 print "NotificationCenter: creating listener list for (source, signal) = (%s, %s)" % (source, signal) 256 self.listeners[(source, signal)] = [] 190 257 191 258 def addListener(self, source, signal, callable): 192 #print 'adding listener: source=%s, signal=%s, callable=%s' % (source, signal, callable) 193 self._ensureListenerList(source, signal) 194 self.listeners[(source, signal)].append(callable) 259 self.lock.acquire() 260 261 #print 'adding listener: source=%s, signal=%s, callable=%s' % (source, signal, callable) 262 self._ensureListenerList(source, signal) 263 print "NotificationCenter.addListener(%s, %s, %s)" % (source, signal, callable) 264 self.listeners[(source, signal)].append(callable) 265 266 self.lock.release() 195 267 196 268 def removeListener(self, source, signal, func): 197 key = (source, signal) 198 if self.listeners.has_key(key): 199 f = lambda x: x is not func 200 self.listeners[key] = filter(f, self.listeners[key]) 269 self.lock.acquire() 270 271 key = (source, signal) 272 if self.listeners.has_key(key): 273 f = lambda x: x is not func 274 self.listeners[key] = filter(f, self.listeners[key]) 275 276 self.lock.release() 201 277 202 278 def post(self, source, signal): 203 #print 'signal posted: (%s, %s)' % (source, signal) 204 key = (source, signal) 205 if self.listeners.has_key(key): 206 for f in self.listeners[key]: 207 #print '\tnotifying %s' % (f,) 208 f(source, signal) 209 279 self.lock.acquire() 280 print "NotificationCenter.post()" 281 282 #print 'signal posted: (%s, %s)' % (source, signal) 283 key = (source, signal) 284 if self.listeners.has_key(key): 285 print "\tlisteners for (source, signal) = (%s, %s)" % (source, signal) 286 for f in self.listeners[key]: 287 print '\t\tnotifying %s' % (f,) 288 f(source, signal) 289 else: 290 print "\tno listener for (source, signal) = (%s, %s)" % (source,signal) 291 292 self.lock.release() 210 293 211 294 class PassthroughNode(Node): 212 295 def __init__(self): 213 Node.__init__(self) 214 i = NodeInputOutput() 215 self.inputs.append(i) 216 self.outputs.append(i) 296 Node.__init__(self) 297 i = NodeInputOutput() 298 self.inputs.append(i) 299 self.outputs.append(i) 300 301 class TimedNode(Node): 302 class TimedSource(Source): 303 def __init__(self, timeout=2): 304 self.timeout = timeout 305 self.buffer = [] 306 self.buflock = threading.Lock() 307 self.counter = 0 308 self.nc = NotificationCenter() 309 310 self.startTimer() 311 312 def startTimer(self): 313 self.timer = threading.Timer(2, self.handleTimeout) 314 self.timer.start() 315 316 def handleTimeout(self): 317 print "handleTimeout" 318 # This function is called from another thread (Timer) so use a lock... 319 print "\thandleTimeout: acquiring lock" 320 self.buflock.acquire() 321 print "\thandleTimeout: lock acquired, inserting data" 322 323 self.counter += 1 324 self.buffer.insert(0, '<EVENT: %d>' % self.counter) 325 326 print "\thandleTimeout: releasing lock" 327 self.buflock.release() 328 329 self.startTimer() 330 331 self.nc.post(self, 'DATA_READY') 332 333 def read(self): 334 print "read" 335 # The buffer might be locked for insertion, so grab the lock first, then pop the earliest sample 336 print "\tread: acquiring lock" 337 self.buflock.acquire() 338 print "\tread: lock acquired, popping data" 339 340 ret = self.buffer.pop() 341 342 print "\tread: releasing lock" 343 self.buflock.release() 344 345 return ret 346 347 348 def __init__(self): 349 Node.__init__(self) 350 self.nc = NotificationCenter() 351 self.source = TimedNode.TimedSource() 352 self.sources = [self.source] 353 354 self.nc.addListener(self.source, 'CONNECT', self.handleConnections) 355 self.nc.addListener(self.source, 'DISCONNECT', self.handleConnections) 356 357 def handleData(self): 358 self.nc.post(self.source, 'DATA_READY') 359 360 def handleConnections(self, source, signal): 361 sink = source.sink 362 363 if signal == 'CONNECT': 364 self.nc.addListener(sink, 'DATA_REQUEST', self.handleDataRequest) 365 elif signal == 'DISCONNECT': 366 self.nc.removeListener(sink, 'DATA_REQUEST', self.handleDataRequest) 367 368 def handleDataRequest(self, sink, signal): 369 pass 370 371 372 class ConsoleInputNode(Node): 373 class ConsoleMon(threading.Thread, Source): 374 def __init__(self, update_func, delay=0.01, readlen=20, group=None, target=None, name=None): 375 threading.Thread.__init__(self, group, target, name) 376 self.delay = delay 377 self.readlen = readlen 378 self.buffer = "" 379 380 def run(self): 381 time.sleep(self.delay) 382 bytes = sys.stdin.read(self.readlen) 383 self.buffer += bytes 384 385 def numSamplesReady(self): 386 return len(self.buffer) 387 388 def read(self): 389 l = len(self.buffer) 390 ret = self.buffer[0:l] 391 del self.buffer[0:l] 392 return ret 393 394 def __init__(self): 395 Node.__init__(self) 396 self.nc = NotificationCenter() 397 self.source = ConsoleInputNode.ConsoleMon(self.handleData) 398 self.inputs.append(self.source) 399 400 self.nc.addListener(self.source, 'CONNECT', self.handleConnections) 401 self.nc.addListener(self.source, 'DISCONNECT', self.handleConnections) 402 403 self.source.start() 404 405 def handleConnections(self, source, signal): 406 sink = source.sink 407 408 if signal == 'CONNECT': 409 self.nc.addListener(sink, 'DATA_REQUEST', self.handleDataRequest) 410 elif signal == 'DISCONNECT': 411 self.nc.removeListener(sink, 'DATA_REQUEST', self.handleDataRequest) 412 413 def handleDataRequest(self, sink, signal): 414 pass 415 217 416 218 417 class ConsolePrintNode(Node): 219 418 def __init__(self): 220 Node.__init__(self)221 self.nc = NotificationCenter()222 i = Input()223 self.nc.addListener(i, 'CONNECT', self.handleConnections)224 self.nc.addListener(i, 'DISCONNECT', self.handleConnections)225 self.inputs.append(i)419 Node.__init__(self) 420 self.nc = NotificationCenter() 421 i = Sink() 422 self.nc.addListener(i, 'CONNECT', self.handleConnections) 423 self.nc.addListener(i, 'DISCONNECT', self.handleConnections) 424 self.sinks.append(i) 226 425 227 426 def handleConnections(self, node_input, signal): 228 output = node_input.source229 230 if signal == 'CONNECT':231 self.nc.addListener(output, 'DATA_READY', self.handleInput)232 elif signal == 'DISCONNECT':233 self.nc.removeListener(output, 'DATA_READY', self.handleInput)427 output = node_input.source 428 429 if signal == 'CONNECT': 430 self.nc.addListener(output, 'DATA_READY', self.handleInput) 431 elif signal == 'DISCONNECT': 432 self.nc.removeListener(output, 'DATA_READY', self.handleInput) 234 433 235 434 def handleInput(self, node_output, signal): 236 #print '>>>> handleInput (node_output=%s, signal=%s)' % (node_output, signal)237 if signal == 'DATA_READY':238 data = node_output.read()239 #print '\t\tdata=%s' % (data,)240 for datum in data:241 print "read: %s" % (datum,)242 243 class Func Output(NodeInputOutput):435 #print '>>>> handleInput (node_output=%s, signal=%s)' % (node_output, signal) 436 if signal == 'DATA_READY': 437 data = node_output.read() 438 #print '\t\tdata=%s' % (data,) 439 for datum in data: 440 print "read: %s" % (datum,) 441 442 class FuncSource(NodeInputOutput): 244 443 def getFunc(self): 245 def f(*args, **kwargs):246 self.writeSample((args, kwargs))247 248 return f444 def f(*args, **kwargs): 445 self.writeSample((args, kwargs)) 446 447 return f
