source: ResearchApps/Measurement/warpnet_framework/warpnet_client.py

Last change on this file was 1565, checked in by murphpo, 14 years ago
File size: 19.5 KB
Line 
1# WARPnet Client<->Server Architecture
2# WARPnet Client controller
3#
4# Author: Siddharth Gupta
5
6#import struct # import struct for converting string/integers to binary and defined bit widths
7from warpnet_common_params import *
8from warpnet_client_definitions import *
9from twisted.internet import reactor, stdio
10from twisted.internet.protocol import Protocol, ClientFactory
11from twisted.protocols.basic import NetstringReceiver, LineReceiver
12import json, cPickle
13from twisted.python import threadable
14
15###### Variables
16
17clientFac = 0   # global variable that is the instance of the the WARPnetClient Factory so all functions can write to it
18handleReflectedData = False
19
20RETRANSMIT_COUNT = 10 # Max retries to send the data struct
21WAIT_TIME = 1 # Time to wait before resending data structs
22
23
24####### Class Definitions
25
26
27class Register(InstructionClass):
28   
29    def __init__(self, group, structIDList, nodeID, accessLevel, addReg=True):
30        if addReg:
31            self.sendType = SC_REG_ADD
32        else:
33            self.sendType = SC_REG_DEL
34        self.group = group
35        self.structIDList = structIDList
36        self.nodeID = nodeID
37        self.accessLevel = accessLevel
38        self.updateComplete = False
39   
40    def sendRegistration(self):
41        dataDict = {'grp': self.group, 'structID': self.structIDList, 'nodeID': self.nodeID, 'accessLevel': self.accessLevel}
42        clientFac.sendInstruction(self.sendType, dataDict, self.updateFromServer)
43        if not threadable.isInIOThread():
44            return self.waitForUpdate()
45   
46    def updateFromServer(self, data, status):
47        if data['stat'] == SC_REG_SUCCESS:
48#           print "Successfully registered for struct %s, node %d" % (self.structIDList, self.nodeID)
49            self.returnStatus = True
50        else:
51            checkError(data['stat'])
52            self.returnStatus = False
53        self.updateComplete = True
54
55class ConnectToNodes(InstructionClass):
56
57    def __init__(self, connList, group):
58        self.sendType = SC_CONNECT
59        self.connList = connList
60        self.group = group
61        self.updateComplete = False
62   
63    def connect(self):
64        dataDict = {'grp': self.group, 'connList': self.connList}
65        clientFac.sendInstruction(self.sendType, dataDict, self.updateFromServer)
66        if not threadable.isInIOThread():
67            return self.waitForUpdate()
68   
69    def updateFromServer(self, data, status):
70        if data['stat'] == SC_CONNECT_SUCCESS:
71            print "Connected to nodes"
72            self.returnStatus = True
73        elif data['stat'] == SC_CONNECT_TYPE_MISMATCH:
74            print "Node connection type mismatch"
75            self.returnStatus = False
76        elif data['stat'] == SC_CONNECT_UNKNOWN_TYPE:
77            print "Unknown connection type"
78            self.returnStatus = False
79        else:
80            checkError(data['stat'])
81            self.returnStatus = False
82        self.updateComplete = True
83
84class GroupID(InstructionClass):
85
86    def __init__(self, group, connect):
87        if connect:
88            self.sendType = SC_GRPID_ADD
89        else:
90            self.sendType = SC_GRPID_DEL
91        self.group = group
92        self.updateComplete = False
93   
94    def register(self):
95        dataDict = {'grp': self.group}
96        clientFac.sendInstruction(self.sendType, dataDict, self.updateFromServer)
97        if not threadable.isInIOThread():
98            return self.waitForUpdate()
99   
100    def updateFromServer(self, data, status):
101        if data['stat'] == SC_GRPID_SUCCESS:
102            print "Registered Group ID %d" % self.group
103            self.returnStatus = True
104        elif data['stat'] == SC_GRPID_GROUP_KNOWN:
105            print "Group ID %d already known, nothing to do" % self.group
106            self.returnStatus = True
107        else:
108            checkError(data['stat'])
109            self.returnStatus = False
110        self.updateComplete = True
111       
112
113class StructLock(InstructionClass):
114    def __init__(self, group, structID, nodeID, lock=True):
115        if lock:
116            self.sendType = SC_LOCK
117        else:
118            self.sendType = SC_UNLOCK
119        self.group = group
120        self.structID = structID
121        self.nodeID = nodeID
122        self.updateComplete = False
123   
124    def sendCmd(self):
125        dataDict = {'grp': self.group, 'structID': self.structID, 'nodeID': self.nodeID}
126        clientFac.sendInstruction(self.sendType, dataDict, self.updateFromServer)
127        if not threadable.isInIOThread():
128            return self.waitForUpdate()
129       
130    def updateFromServer(self, data, status):
131        if data['stat'] == SC_LOCK_SUCCESS:
132            print "Successfully locked or unlocked"
133            self.returnStatus = True
134        elif data['stat'] == SC_LOCK_EXISTS:
135            print "Struct already locked"
136            self.returnStatus = False
137        elif data['stat'] == SC_LOCK_NOLOCK:
138            print "Struct node pair already unlocked"
139            self.returnStatus = False
140        elif data['stat'] == SC_LOCK_NOT_OWNER:
141            print "Cannot unlock as not owner of lock"
142            self.returnStatus = False
143        else:
144            checkError(data['stat'])
145            self.returnStatus = False
146        self.updateComplete = True
147
148# This is the class Node that is the representation of a physical WARP node. This class must be instantiated with the nodeID and the type
149# of connection the server has to that node (pcap, backdoor, socket). In addition, each Node has parameters that can be configured. These
150# are stored in instantiations of ClientStructs. Each struct added to the node must have a tag that is used to refer to that struct.
151# The addStruct function can be used to add new ClientStructs or reassign old tags with new structs. The function sendToNode requests the
152# appropriate struct for the binary representation and then sends it to the WARP board. The node also keeps track of the structs that require
153# registration with the server.
154class Node:
155    nodeID = -1
156    serverType = -1
157
158    def __init__(self, nodeid, servConnection, group=0):
159        self.nodeID = nodeid
160        self.nodeStructs = dict()
161        self.registration = dict()
162        self.handleUnrequested = dict()
163        self.serverType = servConnection
164        self.group = group
165   
166    # There are a few optional items for addStruct. By default any struct added is registered with the server. If you would like to receive
167    # unrequested structs of this type then set handleUnrequested=True. The access level tells the server what kind of updates from nodes
168    # will make it to the client. For example, an ADDRESSED_TO_ME struct will not get an update when another client requests it.
169    def addStruct(self, tag, struct, registerWithServer=True, accessLevel=ADDRESSED_TO_ME, handleUnrequested=False):
170        self.nodeStructs[tag] = struct
171        self.handleUnrequested[tag] = handleUnrequested
172        if registerWithServer:
173            if handleUnrequested:
174                if accessLevel == ADDRESSED_TO_ME:
175                    self.registration[tag] = ADDRESSED_TO_MY_GROUP
176                else:
177                    self.registration[tag] = accessLevel
178            else:
179                self.registration[tag] = accessLevel
180   
181    # The sendToNode function should be called when the client wants to send the appropriate struct to the respective nodeID. The tag
182    # is the most important part as it will refer to the struct being sent. The input makeAvailable can be modified to change the
183    # access of the data coming from the node.
184    def sendToNode(self, tag, makeAvailable=MAKE_AVAIL_ME):
185        #print "sending %s struct to server" % tag
186        dataDict = {'grp': self.group, 'structID': self.nodeStructs[tag].structID, 'nodeID': self.nodeID, 'raw': self.nodeStructs[tag].prepToSend(self.nodeID), 'access': makeAvailable}
187        clientFac.sendDataStruct(SC_DATA_TO_NODE, self.nodeStructs[tag].expectedReturnStructID, dataDict, self.nodeStructs[tag].structID, self.nodeID, self.nodeStructs[tag].callbackFromResponse)
188        if not threadable.isInIOThread():
189            return self.nodeStructs[tag].updateComplete()
190   
191    def lockStruct(self, tag):
192        lock = StructLock(self.group, self.nodeStructs[tag].structID, self.nodeID, True)
193        lock.sendCmd()
194
195    def unlockStruct(self, tag):
196        unlock = StructLock(self.group, self.nodeStructs[tag].structID, self.nodeID, False)
197        unlock.sendCmd()
198   
199    def getNodeID(self):
200        return self.nodeID
201       
202    def sendAllRegs(self):
203        status = True
204        for tag in self.registration.keys():
205            tempstatus = self.sendRegTag(tag)
206            status = status and tempstatus
207        return status
208   
209    def sendRegTag(self, tag):
210        registerInst = Register(self.group, [self.nodeStructs[tag].structID, self.nodeStructs[tag].expectedReturnStructID], self.nodeID, self.registration[tag], addReg=True)
211        if self.handleUnrequested[tag]:
212            clientFac.permanentQueue[(self.nodeStructs[tag].expectedReturnStructID, self.nodeID)] = UnrequestedData(self.nodeStructs[tag].expectedReturnStructID, self.nodeID, self.nodeStructs[tag].callbackFromResponse)
213        clientFac.registeredPairs[(self.nodeStructs[tag].structID, self.nodeID)] = self.nodeStructs[tag]
214        return registerInst.sendRegistration()
215
216
217# This function initiates the connection of this client script with the server. Additionally, the server connects to the respective nodes that
218# are requested by the script. The input network is a dictionary of Nodes
219def connectToServer(network):
220    #clientFac.networkDef(network)
221    connectionList = []
222    for x in network.keys():
223        connectionList.append((x, network[x].serverType))
224    connector = ConnectToNodes(connectionList, network[x].group)
225    return connector.connect()
226   
227# This function is used to create the network of nodes which the client controls. It requires a blank or existing dictionary of nodes and
228# a new Node instance.
229def createNode(networkDict, Node):
230    networkDict[Node.getNodeID()] = Node
231
232# The sendRegistrations function registers all the structs in the Nodes in 'network' with the server
233def sendRegistrations(network):
234    status = True
235    for node in network:
236        tempstatus = network[node].sendAllRegs()
237        status = status and tempstatus
238    return status
239
240# This is a start of time call that the script must use to register its group ID. By default the group ID is 0. If the user wants different
241# group IDs, the input takes in a list of group IDs.
242def registerWithServer(ids=None):
243    status = True
244    if ids == None:
245        idList = [0]
246    else:
247        idList = ids
248    for x in idList:
249        grouper = GroupID(x, True)
250        tempstatus = grouper.register()
251        status = status and tempstatus
252    return status
253   
254# If there are multiple scripts running in the system then requests from one client can be seen by others as well. To allow this client
255# to process that information call this function with hR=True.
256def handleReflection(hR=False):
257    global handleReflectedData
258    handleReflectedData = hR
259
260# If there is an emulator in the system, call this function to import in the emulator library. The return value is an instance
261# to the emulator processor.
262def initEmulator():
263    import warpnet_client_azimuth_controller
264    emulator = warpnet_client_azimuth_controller.Emulator(clientFac)
265    return emulator
266
267####### Twisted Networking
268
269# WARPnet Protocol is a subclass of NetstringReciever. The superclass implements the type checking to only decode correctly formatted
270# netstrings. When the connection is made, the protocol registers its instance with the factory for easy access. Once that is done, the
271# user script is called
272class WARPnetProtocol(NetstringReceiver):
273   
274    def stringReceived(self, data):
275        #print "received data %s" % json.loads(data)
276        self.factory.dataRecvd(cPickle.loads(data))
277   
278    def connectionMade(self):
279        print "Successfully connected to WARPnet Server"
280        self.factory.connProtocol = self
281        reactor.callInThread(self.factory.scriptLoc)
282
283# WARPnetClient is an implementation of ClientFactory. It keeps track of the protocol, the script function and the network
284class WARPnetClient(ClientFactory):
285    protocol = WARPnetProtocol
286   
287    def __init__(self, file):
288        global clientFac
289        self.scriptLoc = file
290        clientFac = self
291        self.dataQueue = []
292        self.permanentQueue = dict()
293        self.registeredPairs = dict()
294        self.counter = 0
295   
296    def networkDef(self, network):
297        self.nodeNetwork = network
298   
299    def sendInstruction(self, instructionType, dataDict, callback=None):
300        if callback != None:
301            self.dataQueue.append(InstructionResponse(instructionType, dataDict, callback, self))
302        self.sendDataToServer(instructionType, dataDict)
303   
304    # pre-processing for a data struct, add instance to queue, give to sendDataToServer
305    def sendDataStruct(self, sendType, expectedReturnStructID, dataDict, structID, nodeID, callback=None):
306        if callback != None:
307            self.dataQueue.append(DataResponse(sendType, expectedReturnStructID, dataDict, structID, nodeID, callback, self))
308        self.sendDataToServer(sendType, dataDict)
309   
310    # When data is to be sent to the server, the user script must provide the type of packet and the rest of the data in the form
311    # of a dictionary. These are put together and sent to the server
312    def sendDataToServer(self, sendType, dataDict):
313        dictToSend = {'pktType': sendType}
314        dictToSend.update(dataDict)
315        pickledData = cPickle.dumps(dictToSend)
316        if threadable.isInIOThread():
317            self.connProtocol.sendString(pickledData)
318        else:
319            reactor.callFromThread(self.connProtocol.sendString, pickledData)
320   
321    def removeFromQueue(self, instToRemove):
322        self.dataQueue.remove(instToRemove)
323   
324    # check received packet with the waiting queue otherwise discard
325    def dataRecvd(self, data):
326        if data['pktType'] == SC_DATA_FROM_NODE:
327            self.counter += 1
328        for x in self.dataQueue:
329            if x.responseCheck(data):
330                return
331        try:
332            if self.permanentQueue[(data['structID'],data['nodeID'])].responseCheck(data):
333                return
334        except KeyError:
335            pass
336        if handleReflectedData:
337            try:
338                if data['pktType'] == SC_DATA_TO_NODE_REFLECT:
339                    structinst = self.registeredPairs[(data['structID'], data['nodeID'])]
340                    self.dataQueue.append(DataReflect(structinst.expectedReturnStructID, data['structID'], data['nodeID'], structinst.callbackFromResponse, self))
341                    print "reflected data"
342                    return
343                elif data['pktType'] == SC_LOCK_NOTIFICATION:
344                    structinst = self.registeredPairs[(data['structID'], data['nodeID'])]
345                    structinst.lockStatus = True
346                    print "lock notification"
347                    return
348                elif data['pktType'] == SC_UNLOCK_NOTIFICATION:
349                    structinst = self.registeredPairs[(data['structID'], data['nodeID'])]
350                    structinst.lockStatus = False
351                    print "unlock notification"
352                    return
353            except KeyError:
354                pass
355        print "unknown received struct %s" % data
356       
357
358
359class DataResponse(WaitForResponse):
360   
361    # initialize with the struct type that was sent and the expected return type. also include the function to call when correct
362    def __init__(self, sentStructType, expectedReturnStructID, dataDict, structID, nodeID, instanceToCall, factoryInstance):
363        self.sentStructType = sentStructType
364        self.expectedReturnStructID = expectedReturnStructID
365        self.dataDict = dataDict
366        self.structID = structID
367        self.nodeID = nodeID
368        self.instanceToCall = instanceToCall
369        self.factoryInstance = factoryInstance
370       
371        self.resendCounter = 0
372        self.timeoutTimer = reactor.callLater(WAIT_TIME, self.timeoutExpired)
373   
374    # the receiver gives the received packet to this checker. if the response matches then it must cancel itself from the queue and cancel the
375    # resender.
376    def responseCheck(self, data):
377        if data['pktType'] == SC_DATA_FROM_NODE:
378            if data['structID'] == self.expectedReturnStructID and data['nodeID'] == self.nodeID:
379                if self.timeoutTimer.active():
380                    self.timeoutTimer.cancel()
381                self.factoryInstance.removeFromQueue(self)
382                self.instanceToCall(data, True)
383                # process the data, go the required callback
384                return True
385        elif data['pktType'] == SC_STAT and data['reqType'] == SC_DATA_TO_NODE:
386            if self.timeoutTimer.active():
387                self.timeoutTimer.cancel()
388            self.factoryInstance.removeFromQueue(self)
389            self.instanceToCall(data, False)
390            return True
391        return False
392
393    # recreate the timeouttimer, increment the resend counter, resend just the data
394    def timeoutExpired(self):
395        self.resendCounter += 1
396        if self.resendCounter > RETRANSMIT_COUNT:
397            self.factoryInstance.removeFromQueue(self)
398            #print "too many timeouts, cancelling send"
399            self.instanceToCall({'stat': C_TIMEOUT}, False)
400        else:
401            print "resending after timeout data %x" % self.structID
402            self.factoryInstance.sendDataToServer(self.sentStructType, self.dataDict)
403            self.timeoutTimer = reactor.callLater(WAIT_TIME, self.timeoutExpired)
404
405class InstructionResponse(WaitForResponse):
406   
407    # initialize with the struct type that was sent and the expected return type. also include the function to call when correct
408    def __init__(self, instructionType, dataDict, instanceToCall, factoryInstance):
409        self.instructionType = instructionType
410        self.dataDict = dataDict
411        self.instanceToCall = instanceToCall
412        self.factoryInstance = factoryInstance
413       
414        self.resendCounter = 0
415        self.timeoutTimer = reactor.callLater(WAIT_TIME, self.timeoutExpired)
416   
417    # the receiver gives the received packet to this checker. if the response matches then it must cancel itself from the queue and cancel the
418    # resender.
419    def responseCheck(self, data):
420        if data['pktType'] == SC_STAT and data['reqType'] == self.instructionType:
421            if self.timeoutTimer.active():
422                self.timeoutTimer.cancel()
423            self.factoryInstance.removeFromQueue(self)
424            self.instanceToCall(data, True)
425            return True
426        return False
427
428    # recreate the timeouttimer, increment the resend counter, resend just the data
429    def timeoutExpired(self):
430        self.resendCounter += 1
431        if self.resendCounter > RETRANSMIT_COUNT:
432            self.factoryInstance.removeFromQueue(self)
433            self.instanceToCall(data, False)
434            print "too many timeouts, cancelling send"
435        else:
436            print "resending after timeout instruction"
437            self.factoryInstance.sendDataToServer(self.instructionType, self.dataDict)
438            self.timeoutTimer = reactor.callLater(WAIT_TIME, self.timeoutExpired)
439
440class UnrequestedData(WaitForResponse):
441
442    def __init__(self, expectedReturnStructID, nodeID, instanceToCall):
443        self.expectedReturnStructID = expectedReturnStructID
444        self.nodeID = nodeID
445        self.instanceToCall = instanceToCall
446   
447    def responseCheck(self, data):
448        if data['pktType'] == SC_DATA_FROM_NODE:
449            self.instanceToCall(data, True)
450            return True
451        return False
452
453class DataReflect(WaitForResponse):
454   
455    # initialize with the struct type that was sent and the expected return type. also include the function to call when correct
456    def __init__(self, expectedReturnStructID, structID, nodeID, instanceToCall, factoryInstance):
457        self.expectedReturnStructID = expectedReturnStructID
458        self.structID = structID
459        self.nodeID = nodeID
460        self.instanceToCall = instanceToCall
461        self.factoryInstance = factoryInstance
462       
463        self.timeoutTimer = reactor.callLater(WAIT_TIME, self.timeoutExpired)
464   
465    # the receiver gives the received packet to this checker. if the response matches then it must cancel itself from the queue and cancel the
466    # timeout timer.
467    def responseCheck(self, data):
468        if data['pktType'] == SC_DATA_FROM_NODE:
469            if data['structID'] == self.expectedReturnStructID and data['nodeID'] == self.nodeID:
470                if self.timeoutTimer.active():
471                    self.timeoutTimer.cancel()
472                self.factoryInstance.removeFromQueue(self)
473                self.instanceToCall(data, True)
474                # process the data, go the required callback
475                return True
476        return False
477
478    # recreate the timeouttimer, increment the resend counter, resend just the data
479    def timeoutExpired(self):
480        self.factoryInstance.removeFromQueue(self)
481   
482
483class DataLogger(DataCollector):
484   
485    def __init__(self, filename, flushTime):
486        self.filename = filename
487        self.flushTime = flushTime
488        self.logFile = open(self.filename, 'w')
489       
490        if flushTime != 0:
491            self.timer = reactor.callLater(self.flushTime, self.flushFile)
492   
493    def flushFile(self):
494        self.logFile.close()
495        self.logFile = open(self.filename, 'a')
496        self.timer = reactor.callLater(self.flushTime, self.flushFile)
497       
498    def log(self, dataToLog):
499        if threadable.isInIOThread():
500            self.logFile.write(dataToLog)
501        else:
502            reactor.callFromThread(self.logFile.write, dataToLog)
503       
504    def closeFile(self):
505        self.logFile.close()
506
507
508
509
510           
511class CmdReader(LineReceiver):
512    from os import linesep as delimiter
513   
514    def __init__(self, functionCall=None):
515        if functionCall is None:
516            self.func = self.noContinue
517        else:
518            self.func = functionCall
519   
520    def noContinue(self, line):
521        #print "In noContinue"
522        pass
523   
524    def lineReceived(self, line):
525        if line == 'a':
526            print clientFac.dataQueue
527            print clientFac.permanentQueue
528            print clientFac.registeredPairs
529        elif line == 'c':
530            print clientFac.counter
531        elif line == 'q':
532            reactor.stop()
533        else:
534            self.func(line)
535
536
537
538
539
540
541
542
543
544
545
Note: See TracBrowser for help on using the repository browser.