source: ResearchApps/Measurement/warpnet_framework/warpnet_server.py

Last change on this file was 1558, checked in by sgupta, 14 years ago

moved warpnet

File size: 30.0 KB
Line 
1# WARPnet Client<->Server Architecture
2# WARPnet Server system
3#
4# Author: Siddharth Gupta
5
6from warpnet_common_params import *
7from warpnet_server_params import *
8from twisted.internet import stdio, reactor
9from twisted.internet.protocol import Protocol, Factory, ClientFactory
10from twisted.protocols.basic import NetstringReceiver, LineReceiver
11import struct, json, cPickle, Queue, pprint, time
12import pcapy
13from impacket import ImpactPacket
14
15debugLevel = 0  # Determines if the server prints data to the command line. To enable debug mode call
16
17ethInterface = ''
18serverPort = 10101
19localMacAddress = range(0,6)
20
21timePcapFactory = 0
22timePcapSend = 0
23
24# A class created for comparison purposes. A dontcare will always return True when compared to another integer. This allows for a
25# very general search/subset function when using the registration and lock controllers
26class dontcare(int):
27    def __eq__(self, other):
28        return True
29   
30    def __ne__(self, other):
31        return False
32
33# This class is responsible for tracking all the existing controllers connected to the server. It keeps track
34# of two structures: (1) a list of the controller instance, the id and connected groups, (2) a dictionary where the
35# key is the group and the value is a list of controllers that belong to that group id.
36class controllerConnection():
37    def __init__(self):
38        self.contList = []
39        self.grpDict = dict()
40        self.availableCont = range(1,256)   # New controller ids are handed out based on available ones. Once a controller
41                                            # disconnectes its id is put back into the list
42
43    # Called once when a new controller joins. Is assigned an available id and added to the controller instance list
44    def addNewConn(self, cInst):
45        cID = self.availableCont.pop(0);
46        self.contList.append((cInst, cID, []))
47        return cID
48
49    # Called when a controller disconnects. Get rid of the instance from the controller list and any references to it in the group dictionary.
50    def deleteInst(self, cInst):
51        for x in self.contList[:]:  #make a copy of the list before mutating it
52            if x[0] == cInst:
53                cID = x[1]
54                self.contList.remove(x)
55        for x in self.grpDict.keys():
56            if cID in self.grpDict[x]:
57                self.grpDict[x].remove(cID)
58        self.availableCont.append(cID)  # Let the server re-assign the id to a controller
59   
60    # Return the instance of the controller given the controllerID
61    def getInst(self, cID):
62        for x in self.contList:
63            if (x[1] == cID):
64                return x[0]
65        return None
66   
67    # Return all controllers that are part of a group
68    def getIDs(self, cGrp):
69        return self.grpDict(cGrp)
70   
71    # Check if a given controller is part of a group
72    def inGrp(self, cID, cGrp):
73        for x in self.contList:
74            if (x[1] == cID):
75                if cGrp in x[2]:
76                    return True
77                break
78        return False
79       
80    # Add a group association to a controller ID. The group is added to both the controller list and group dictionary.
81    def addGrp(self, cID, cGrp):
82        for x in self.contList:
83            if x[1] == cID:
84                if cGrp in x[2]:
85                    return SC_GRPID_GROUP_KNOWN
86                else:
87                    x[2].append(cGrp)
88
89        if cGrp not in self.grpDict.keys():
90            self.grpDict[cGrp] = [cID]
91        else:
92            self.grpDict[cGrp].append(cID)
93
94        return SC_GRPID_SUCCESS
95   
96    # Delete a group association for a particular controller id.
97    def delGrp(self, cID, cGrp):
98        for x in self.contList:
99            if x[1] == cID:
100                if cGrp in x[2]:
101                    x[2].remove(cGrp)
102               
103        if cID in self.grpDict[cGrp]:
104            self.grpDict[cGrp].remove(cID)
105
106        return SC_GRPID_SUCCESS
107   
108    # Returns the number of groups that a controller is associated with
109    def numGrps(self, cID):
110        for x in self.contList:
111            if x[1] == cID:
112                return len(x[2])
113   
114    # Display the state of the two lists
115    def disp(self):
116        print 'Controller List:'
117        pprint.pprint(self.contList)
118        print 'Group Dictionary:'
119        pprint.pprint(self.grpDict)
120       
121
122# This class keeps track of all registrations in the system.
123class registrationMaster():
124   
125    def __init__(self):
126        self.regList = []
127   
128    def addRegistration(self, cID, cGrp, cInst, structID, nodeID, accessLevel):
129        self.deleteRegistration(cID, cGrp, cInst, structID, nodeID)
130        regToAdd = (cInst, cID, cGrp, structID, nodeID, accessLevel)
131        self.regList.append(regToAdd)
132        return SC_REG_SUCCESS
133
134    def deleteRegistration(self, cID, cGrp, cInst, structID, nodeID):
135        listOfRegs = self.subsetQuery(cID=cID, cGrp=cGrp, cInst=cInst, structID=structID, nodeID=nodeID)
136        for x in listOfRegs:
137            self.regList.remove(x)
138        return SC_REG_SUCCESS
139   
140    def deleteRegistrationAll(self, cID):
141        listOfRegs = self.subsetQuery(cID=cID)
142        for x in listOfRegs:
143            self.regList.remove(x)
144
145    def deleteRegistrationGroup(self, cID, cGrp):
146        listOfRegs = self.subsetQuery(cID=cID, cGrp=cGrp)
147        for x in listOfRegs:
148            self.regList.remove(x)
149
150    def filterRegs(self, cID, cGrp, structID, nodeID, makeAvailable, excludeSelf=False):
151        listOfNodes = []
152       
153        if makeAvailable == MAKE_AVAIL_ME:
154            listOfNodes = self.subsetQuery(cID=cID, cGrp=cGrp, structID=structID, nodeID=nodeID)
155        elif makeAvailable == MAKE_AVAIL_MY_GROUP:
156            listOfNodes = self.subsetQuery(cID=cID, cGrp=cGrp, structID=structID, nodeID=nodeID, accessLevel=ADDRESSED_TO_ME)
157            listOfNodes.extend(self.subsetQuery(cGrp=cGrp, structID=structID, nodeID=nodeID, accessLevel=ADDRESSED_TO_MY_GROUP))
158            listOfNodes.extend(self.subsetQuery(cGrp=cGrp, structID=structID, nodeID=nodeID, accessLevel=ADDRESSED_TO_ANY_GROUP))
159        elif makeAvailable == MAKE_AVAIL_ANY_GROUP:
160            listOfNodes = self.subsetQuery(cID=cID, cGrp=cGrp, structID=structID, nodeID=nodeID, accessLevel=ADDRESSED_TO_ME)
161            listOfNodes.extend(self.subsetQuery(cGrp=cGrp, structID=structID, nodeID=nodeID, accessLevel=ADDRESSED_TO_MY_GROUP))
162            listOfNodes.extend(self.subsetQuery(structID=structID, nodeID=nodeID, accessLevel=ADDRESSED_TO_ANY_GROUP))
163       
164        #print 'filter regs list %s' % listOfNodes
165
166        if excludeSelf:
167            for x in listOfNodes[:]:
168                if x[1] == cID and x[2] == cGrp:
169                    listOfNodes.remove(x)
170       
171        #print 'filter regs list after exclusion %s' % listOfNodes
172       
173        return self.makeInstanceList(listOfNodes)
174   
175    def filterRegsLockNotification(self, structID, nodeID, excludeID):
176        masterList = self.subsetQuery(structID=structID, nodeID=nodeID)
177        IDlist = []
178        for x in masterList[:]: #make a copy of the list and iterate over that because the iterated list is being mutated
179            if (x[1] in IDlist) or (x[1] == excludeID):
180                masterList.remove(x)
181            else:
182                IDlist.append(x[1])
183        #print "lock notification list %s" % masterList
184        return self.makeIDList(masterList)
185   
186    def isRegistered(self, cID, structID, nodeID):
187        list = self.subsetQuery(cID=cID, structID=structID, nodeID=nodeID)
188        if len(list) > 0:
189            return True
190        else:
191            return False
192       
193    def subsetQuery(self, cID=dontcare(), cGrp=dontcare(), cInst=dontcare(), structID=dontcare(), nodeID=dontcare(), accessLevel=dontcare()):
194        tempList = []
195        #print "master set reg %s" % self.regList
196        compTuple = (cInst, cID, cGrp, structID, nodeID, accessLevel)
197        for x in self.regList:
198            if x == compTuple:
199                tempList.append(x)
200        #print "subset reg %s" % tempList
201        return tempList
202   
203    def makeInstanceList(self, registrations):
204        tempInstList = []
205        for x in registrations:
206            tempInstList.append(x[0])
207        return tempInstList
208
209    def makeIDList(self, registrations):
210        tempInstList = []
211        for x in registrations:
212            tempInstList.append(x[1])
213        return tempInstList
214       
215    def disp(self):
216        print 'Registration List'
217        pprint.pprint(self.regList)
218   
219class lockMaster():
220   
221    def __init__(self):
222        self.lockList = []
223       
224    def addLock(self, cID, cGrp, structID, nodeID):
225        locks = self.subsetQuery(structID=structID, nodeID=nodeID)
226        if len(locks) > 0:
227            return SC_LOCK_EXISTS
228        lockToAdd = (cID, cGrp, structID, nodeID)
229        self.lockList.append(lockToAdd)
230        return SC_LOCK_SUCCESS
231           
232    def deleteLock(self, cID, cGrp, structID, nodeID):
233        locks = self.subsetQuery(structID=structID, nodeID=nodeID)
234        if len(locks) == 1:
235            if locks[0][0] == cID and locks[0][1] == cGrp:
236                self.lockList.remove(locks[0])
237                return SC_LOCK_SUCCESS
238            else:
239                return SC_LOCK_NOT_OWNER
240        elif len(locks) == 0:
241            return SC_LOCK_NOLOCK
242        elif len(locks) > 1:
243            raise Duplicate_Locks
244   
245    def isLocked(self, requester_cID, requester_cGrp, structID, nodeID):
246        locks = self.subsetQuery(structID=structID, nodeID=nodeID)
247        if len(locks) == 0:
248            return False
249        elif len(locks) == 1:
250            if locks[0][0] == requester_cID and locks[0][1] == requester_cGrp:
251                return False
252            else:
253                return True
254        else:
255            raise Duplicate_Locks
256   
257    def deleteLocksID(self, cID=dontcare(), cGrp=dontcare()):
258        locks = self.subsetQuery(cID=cID, cGrp=cGrp)
259        tempList = []
260        for x in locks:
261            tempList.append(x)
262            self.lockList.remove(x)
263        return tempList
264       
265
266    def subsetQuery(self, cID=dontcare(), cGrp=dontcare(), structID=dontcare(), nodeID=dontcare()):
267        tempList = []
268        #print "master set lock %s" % self.lockList
269        compTuple = (cID, cGrp, structID, nodeID)
270        for x in self.lockList:
271            if x == compTuple:
272                tempList.append(x)
273        #print "subset lock %s" % tempList
274        return tempList
275
276    def disp(self):
277        print 'Lock List:'
278        pprint.pprint(self.lockList)
279
280
281# Class definition to handle processing of all controller messages. In addition it keeps track of registrations and locks
282class ProcessControllerData():
283
284    # The WARPnetFactory instantiates this class and gives it its own instance. This lets the class send messages to specific controllers
285    def __init__(self, factory):
286        self.factory = factory
287        self.procFunc = {
288                        SC_REG_ADD: self.pc_reg,
289                        SC_REG_DEL: self.pc_dereg,
290                        SC_GRPID_ADD: self.pc_group_add,
291                        SC_GRPID_DEL: self.pc_group_del,
292                        SC_LOCK: self.pc_lock,
293                        SC_UNLOCK: self.pc_unlock,
294                        SC_CONNECT: self.pc_connect,
295                        SC_DATA_TO_NODE: self.pc_data,
296                        SC_EMULATOR_PRESENT: self.emulator,
297                        }; #register each of the handlers
298
299    # Input function that hands off the received message to the appropriate handler.
300    def controllerMsg(self, controllerID, data):
301        if controllerID != -1:
302            try:
303                self.procFunc[data['pktType']](controllerID, data)
304            except KeyError:
305                print "not defined yet %s" % data
306   
307    # Handler for the registration message
308    def pc_reg(self, controllerID, data):
309        if debugLevel > 0:
310            print "regAdd %s" % data
311        group = data['grp']
312        node = data['nodeID']
313        if self.factory.controllers.inGrp(controllerID, group):
314            if self.factory.nodeManager.isConnected(node, controllerID, group):
315                for x in data['structID']:
316                    status = self.factory.regManager.addRegistration(controllerID, group, self.factory.controllers.getInst(controllerID), x, node, data['accessLevel'])
317                self.sendStatus(controllerID, status, SC_REG_ADD)
318                for x in data['structID']:
319                    if self.factory.lockManager.isLocked(controllerID, group, x, node):
320                        self.sendLockNotification(controllerID, x, node)
321            else:
322                self.sendStatus(controllerID, SC_STAT_NOT_CONN_NODE, SC_REG_ADD)
323        else:
324            self.sendStatus(controllerID, SC_STAT_GROUP_UNMATCHED, SC_REG_ADD)
325
326   
327    def pc_dereg(self, controllerID, data):
328        if debugLevel > 0:
329            print "regDel %s" % data
330        group = data['grp']
331        if self.factory.controllers.inGrp(controllerID, group):
332            for x in data['structID']:
333                status = self.factory.regManager.deleteRegistration(controllerID, group, self.factory.controllers.getInst(controllerID), x, data['nodeID'])
334            self.sendStatus(controllerID, status, SC_REG_DEL)
335        else:
336            self.sendStatus(controllerID, SC_STAT_GROUP_UNMATCHED, SC_REG_DEL)
337   
338    def pc_unlock(self, controllerID, data):
339        if debugLevel > 0:
340            print "unlock %s" % data
341        group = data['grp']
342        structID = data['structID']
343        node = data['nodeID']
344        if self.factory.controllers.inGrp(controllerID, group):
345            status = self.factory.lockManager.deleteLock(controllerID, group, structID, node)
346            self.sendStatus(controllerID, status, SC_UNLOCK)
347            if status == SC_LOCK_SUCCESS:
348                notificationList = self.factory.regManager.filterRegsLockNotification(structID, node, controllerID)
349                for x in notificationList:
350                    self.sendUnlockNotification(x, structID, node)
351        else:
352            self.sendStatus(controllerID, SC_STAT_GROUP_UNMATCHED, SC_UNLOCK)
353
354
355    def pc_lock(self, controllerID, data):
356        if debugLevel > 0:
357            print "lock %s" % data
358        group = data['grp']
359        structID = data['structID']
360        node = data['nodeID']
361        if self.factory.controllers.inGrp(controllerID, group):
362            if self.factory.regManager.isRegistered(controllerID, structID, node):
363                status = self.factory.lockManager.addLock(controllerID, group, structID, node)
364                self.sendStatus(controllerID, status, SC_LOCK)
365                if status == SC_LOCK_SUCCESS:
366                    notificationList = self.factory.regManager.filterRegsLockNotification(structID, node, controllerID)
367                    for x in notificationList:
368                        self.sendLockNotification(x, structID, node)
369            else:
370                self.sendStatus(controllerID, SC_NOT_REGISTERED, SC_LOCK)
371        else:
372            self.sendStatus(controllerID, SC_STAT_GROUP_UNMATCHED, SC_LOCK)
373
374   
375    def pc_group_add(self, controllerID, data):
376        if debugLevel > 0:
377            print "grp_add %s" % data
378        group = data['grp']
379        statCode = self.factory.controllers.addGrp(controllerID, group)
380        self.sendStatus(controllerID, statCode, SC_GRPID_ADD)
381       
382    def pc_group_del(self, controllerID, data):
383        if debugLevel > 0:
384            print "grp_del %s" % data
385        group = data['grp']
386        if self.factory.controllers.inGrp(controllerID, group):
387            statCode = self.factory.controllers.delGrp(controllerID, group)
388            self.sendStatus(controllerID, statCode, SC_GRPID_DEL)
389            self.factory.regManager.deleteRegistrationGroup(controllerID, group)
390            self.factory.nodeManager.deleteGroup(controllerID, group)
391            #if self.factory.controllers.numGrps(controllerID) == 0:
392            #   self.deleteAllLocks(controllerID)   
393            listOfDeletedLocks = self.factory.lockManager.deleteLocksID(controllerID, group)
394            self.sendNotificationsForList(listOfDeletedLocks)
395        else:
396            self.sendStatus(controllerID, SC_STAT_GROUP_UNMATCHED, SC_GRPID_DEL)
397   
398
399    def pc_connect(self, controllerID, data):
400        if debugLevel > 0:
401            print "connect %s" % data
402        group = data['grp']
403        connectList = data['connList']
404        if self.factory.controllers.inGrp(controllerID, group):
405            for x in connectList:
406                node = x[0]
407                type = x[1]
408                if type == NODE_PCAP:
409                    statCode = self.factory.nodeManager.addConnection(node, type, controllerID, group)
410                elif type == NODE_SOCKET:
411                    statCode = self.factory.nodeManager.addConnection(node, type, controllerID, group, x[2])
412                else:
413                    statCode = SC_CONNECT_UNKNOWN_TYPE
414                if statCode != SC_CONNECT_SUCCESS:
415                    self.sendStatus(controllerID, statCode, SC_CONNECT)
416                    return
417            self.sendStatus(controllerID, statCode, SC_CONNECT)
418        else:
419            self.sendStatus(controllerID, SC_STAT_GROUP_UNMATCHED, SC_CONNECT)
420   
421   
422    def pc_data(self, controllerID, data):
423        if debugLevel > 0:
424            print "data to node %s" % data
425        group = data['grp']
426        structID = data['structID']
427        node = data['nodeID']
428        availability = data['access']
429        dataToSend = str(data['raw'])
430        if self.factory.controllers.inGrp(controllerID, group):
431            if self.factory.regManager.isRegistered(controllerID, structID, node):
432                if not self.factory.lockManager.isLocked(controllerID, group, structID, node):
433                    self.factory.nodeManager.sendToNode(controllerID, group, structID, node, dataToSend, availability, ETH_HEADER)
434                    notificationList = self.factory.regManager.filterRegs(controllerID, group, structID, node, availability, True)
435                    self.factory.sendData_cInsts(notificationList, SC_DATA_TO_NODE_REFLECT, {'structID': structID, 'nodeID': node, 'raw': dataToSend})
436                else:
437                    self.sendStatus(controllerID, SC_DATA_LOCKED, SC_DATA_TO_NODE)
438            else:
439                self.sendStatus(controllerID, SC_NOT_REGISTERED, SC_DATA_TO_NODE)
440        else:
441            self.sendStatus(controllerID, SC_STAT_GROUP_UNMATCHED, SC_DATA_TO_NODE)
442       
443       
444    def pc_data_from_node(self, controllerID, groupID, structID, nodeID, access, rawData, pcapts):
445        notificationList = self.factory.regManager.filterRegs(controllerID, groupID, structID, nodeID, access, False)
446        self.factory.sendData_cInsts(notificationList, SC_DATA_FROM_NODE, {'structID': structID, 'nodeID': nodeID, 'raw': rawData, 'pcapts': pcapts})
447   
448    def sendStatus(self, controllerID, statCode, reqType):
449        self.factory.sendData_cID(controllerID, SC_STAT, {'stat': statCode, 'reqType': reqType})
450       
451    def sendLockNotification(self, cID, structID, node):
452        self.factory.sendData_cID(cID, SC_LOCK_NOTIFICATION, {'structID': structID, 'nodeID': node})
453       
454    def sendUnlockNotification(self, cID, structID, node):
455        self.factory.sendData_cID(cID, SC_UNLOCK_NOTIFICATION, {'structID': structID, 'nodeID': node})
456   
457    def sendNotificationsForList(self, listOfDeletedLocks):
458        #listOfDeletedLocks = self.factory.lockManager.deleteLockAll(controllerID)
459        for lock in listOfDeletedLocks:
460            notificationList = self.factory.regManager.filterRegsLockNotification(lock[2], lock[3], lock[0])
461            for x in notificationList:
462                self.sendUnlockNotification(x, lock[2], lock[3])
463               
464    def emulator(self, controllerID, data):
465        import warpnet_server_azimuth
466        emulatorProcessor = warpnet_server_azimuth.processEmulatorMsg(self.factory, controllerID, data['ip'])
467        self.procFunc[SC_EMULATOR_MSG_TO_BOX] = emulatorProcessor.message
468   
469# Class definition of the protocol for WARPnetServer. It is an implementation of NetstringReceiver that does the type checking to ensure
470# that all received messages are valid netstrings
471class WARPnetServer(NetstringReceiver):
472
473    id = -1
474
475    # Once the data received, pass the data onto the processor to take action
476    def stringReceived(self, data):
477        dataDict = cPickle.loads(data)
478        self.factory.processor.controllerMsg(self.id, dataDict)
479   
480    # Once the connection is made, assign a new id, send it to the controller.
481    def connectionMade(self):
482        # Connection added to master list with no group affiliations
483        self.id = self.factory.controllers.addNewConn(self)
484        print "Connected to %d" % self.id
485
486       
487    # If the client disconnects, remove its id from the controllerList and all its registrations and locks
488    def connectionLost(self, reason):
489        # Delete the connection from the list structure. Delete any registrations and locks that may exist
490        listOfDeletedLocks = self.factory.lockManager.deleteLocksID(self.id)
491        self.factory.processor.sendNotificationsForList(listOfDeletedLocks)
492        self.factory.regManager.deleteRegistrationAll(self.id)
493        self.factory.controllers.deleteInst(self)
494        self.factory.nodeManager.deleteController(self.id)
495        print "Lost connection to %d" % self.id
496       
497       
498       
499# This is the factory class that is responsible for keeping track of all open connections to the controllers
500class WARPnetFactory(Factory):
501
502    protocol = WARPnetServer
503   
504    def __init__(self):
505        self.processor = ProcessControllerData(self)
506        self.controllers = controllerConnection()
507        self.lockManager = lockMaster()
508        self.regManager = registrationMaster()
509        self.nodeManager = NodeManager(self)
510   
511   
512    def sendDataToController(self, cInst, type, dataDict):
513        dictToSend = {'pktType': type}
514        dictToSend.update(dataDict)
515        cInst.sendString(cPickle.dumps(dictToSend))
516
517    def sendData_cID(self, controllerID, type, dataDict):
518        idInst = self.controllers.getInst(controllerID)
519        if idInst != None:
520            self.sendDataToController(idInst, type, dataDict)
521   
522    def sendData_cInsts(self, cInsts, type, dataDict):
523        for cInst in cInsts:
524            self.sendDataToController(cInst, type, dataDict)
525
526# creates instances for new nodes. Pcap connections make pcapHandler instances and add the node number to the pcapLoop. Socket connections
527# create a new factory/protocol instance that connects to it
528class NodeManager():
529
530    def __init__(self, factory):
531        # need to initialize the two pcap loops
532        self.factory = factory
533        self.pcapRead = pcapReadLoop()
534        pcapObj = self.pcapRead.getPcap()
535        self.pcapWrite = pcapWriteLoop(pcapObj)
536        self.existingConnections = dict()
537        reactor.callInThread(self.pcapRead.passToFactory)
538        reactor.callInThread(self.pcapRead.read)
539        reactor.callInThread(self.pcapWrite.write)
540       
541    def addConnection(self, node, nodeType, cID, cGrp, ip=None):
542        if node in self.existingConnections.keys():
543            if self.existingConnections[node][0] != nodeType:
544                return SC_CONNECT_TYPE_MISMATCH
545            if (cID, cGrp) not in self.existingConnections[node][1]:
546                self.existingConnections[node][1].append((cID, cGrp))
547        else:
548            self.existingConnections[node] = [nodeType, [(cID, cGrp)]]
549            if nodeType == NODE_PCAP:
550                instance = NodeConnectionClient(node, self, self.pcapWrite, self.pcapRead)
551            elif nodeType == NODE_SOCKET:
552                instance = NodeConnectionClient(node, self)
553                reactor.connectTCP(ip, 10101, instance)
554            self.existingConnections[node].append(instance)
555
556        return SC_CONNECT_SUCCESS
557
558    def isConnected(self, node, cID, cGrp):
559        if (node in self.existingConnections.keys()) and ((cID, cGrp) in self.existingConnections[node][1]):
560            return True
561        else:
562            return False
563
564    def sendToNode(self, cID, cGrp, structID, nodeID, data, access, ethernetType):
565        self.existingConnections[nodeID][2].sendStruct(cID, cGrp, structID, nodeID, data, access, ethernetType)
566       
567    def dataFromNode(self, cID, cGrp, structID, nodeID, access, rawData, pcapts):
568        self.factory.processor.pc_data_from_node(cID, cGrp, structID, nodeID, access, rawData, pcapts)
569
570    def deleteController(self, cID):
571        self.deleteSubset(cID=cID)
572   
573    def deleteGroup(self, cID, cGrp):
574        self.deleteSubset(cID=cID, cGrp=cGrp)
575
576    def deleteSubset(self, cID=dontcare(), cGrp=dontcare()):
577        for node in self.existingConnections.keys():
578            data = self.existingConnections[node]
579            for tuple in data[1][:]:
580                if tuple == (cID, cGrp):
581                    data[1].remove(tuple)
582            if len(data[1]) == 0:
583                data[2].deleteConnection()
584                del self.existingConnections[node]
585   
586    def disp(self):
587        print 'NodeManager connections:'
588        pprint.pprint(self.existingConnections)
589        print 'PcapRead mux:'
590        pprint.pprint(self.pcapRead.instMux)
591       
592class NodeSocketProtocol(Protocol):
593   
594    def connectionMade(self):
595        self.factory.connection = self
596   
597    def dataReceived(self, data):
598        structPkt = WARPnetStruct(data, False)
599        node = structPkt.getNode()
600        structID = structPkt.getStruct()
601        controller = structPkt.getController()
602        group = structPkt.getGroup()
603        access = structPkt.getAccess()
604        if debugLevel > 0:
605            print 'received data from node %d' % node
606        floatTimestamp = time.time()
607        stringTimestamp = '%.6f' % floatTimestamp
608        self.factory.rcvdValidStruct(controller, group, structID, node, access, structPkt.get_raw(), stringTimestamp, time.time())
609       
610    def sendData(self, data, ethernetType):
611        self.transport.write(data)
612
613    def disconnect(self):
614        self.transport.loseConnection()
615
616
617   
618class NodeConnectionClient(ClientFactory):
619    protocol = NodeSocketProtocol
620   
621    def __init__(self, id, manager, pcapWriteConn=None, pcapReadConn=None):
622        self.manager = manager
623        self.connection = pcapWriteConn
624        self.readLink = pcapReadConn
625        self.id = id
626        if self.readLink != None:
627            self.readLink.addToHandler(self.id, self)
628       
629    def sendStruct(self, cID, cGrp, structID, nodeID, data, access, ethernetType):
630        structPacket = WARPnetStruct(data, True)
631        structPacket.setController(cID)
632        structPacket.setGroup(cGrp)
633        structPacket.setAccess(access)
634        structPacket.setStruct(structID)
635        structPacket.setNode(nodeID)
636        self.connection.sendData(structPacket, ethernetType)
637       
638    def deleteConnection(self):
639        if self.readLink == None:
640            self.connection.disconnect()
641        else:
642            self.readLink.delFromHandler(self.id)
643           
644    def rcvdValidStruct(self, cID, cGrp, structID, nodeID, access, raw, pcapts, processTime):
645        #print 'valid struct'
646        self.manager.dataFromNode(cID, cGrp, structID, nodeID, access, raw, pcapts)
647        timePcapSend = time.time() - processTime
648
649
650# One instance running in a thread that calls the appropriate pcap handler when data is received
651class pcapReadLoop():
652
653    def __init__(self):
654        self.instMux = dict()
655        self.pcapObj = pcapy.open_live(ethInterface, 1500, True, 2)
656        self.pcapObj.setfilter('ether proto 0x%x' % ETH_RECEIVE)
657        self.counter = 0
658        self.recvdQueue = Queue.Queue()
659   
660    def getPcap(self):
661        return self.pcapObj
662       
663    def read(self):
664        while(reactor.running):
665            self.pcapObj.loop(1, self.dataRcvd)
666
667    def passToFactory(self):
668        while(reactor.running):
669            nextItem = self.recvdQueue.get()
670            header = nextItem[0]
671            data = nextItem[1]
672            pkt = WARPnetEthernetHeader(aBuffer=data)
673            off = pkt.get_header_size()
674            if pkt.getNumStructs() == 1:
675                self.counter += 1
676                structPkt = WARPnetStruct(data[off:], False)
677                node = structPkt.getNode()
678                structID = structPkt.getStruct()
679                controller = structPkt.getController()
680                group = structPkt.getGroup()
681                access = structPkt.getAccess()
682                if debugLevel > 0:
683                    print 'received data from node %d' % node
684                if node in self.instMux.keys():
685                    pcapTimestamp = header.getts()
686                    floatTimestamp = pcapTimestamp[0] + pcapTimestamp[1]/1000000.0
687                    stringTimestamp = '%.6f' % floatTimestamp
688                    reactor.callFromThread(self.instMux[node].rcvdValidStruct, controller, group, structID, node, access, structPkt.get_raw(), stringTimestamp, nextItem[2])
689                    timePcapFactory = time.time() - nextItem[2]
690
691    def dataRcvd(self, header, data): #has to be made complex. parse impacket, find the node and mux on that to send it along
692        self.recvdQueue.put((header, data, time.time()))
693        #print 'received data'
694#       pkt = WARPnetEthernetHeader(aBuffer=data)
695#       off = pkt.get_header_size()
696#       #print struct.unpack_from('6B', data[off:])
697#       if pkt.getNumStructs() == 1:
698#           self.counter += 1
699#           structPkt = WARPnetStruct(data[off:], False)
700#           node = structPkt.getNode()
701#           structID = structPkt.getStruct()
702#           controller = structPkt.getController()
703#           group = structPkt.getGroup()
704#           access = structPkt.getAccess()
705#           if debugLevel > 0:
706#               print 'received data from node %d' % node
707#           if node in self.instMux.keys():
708#               pcapTimestamp = header.getts()
709#               floatTimestamp = pcapTimestamp[0] + pcapTimestamp[1]/1000000.0
710#               stringTimestamp = '%.6f' % floatTimestamp
711#               reactor.callFromThread(self.instMux[node].rcvdValidStruct, controller, group, structID, node, access, structPkt.get_raw(), stringTimestamp)
712
713        #printData = "Source Address: %s" % pkt.get_ether_shost()
714        #print printData
715        #reactor.callFromThread(cmdreader.print_stdio, printData)
716
717    def addToHandler(self, nodeID, instance):
718        self.instMux[nodeID] = instance
719   
720    def delFromHandler(self, nodeID):
721        del self.instMux[nodeID]
722       
723class pcapWriteLoop():
724
725    def __init__(self, pcapObj):
726        self.pcapObj = pcapObj
727        self.writeQueue = Queue.Queue()
728   
729    def sendData(self, structPacket, ethernetType): # has to be much more complex. must create the impacket and send full structure
730        ethPkt = WARPnetEthernetHeader()
731        ethPkt.set_ether_shost(localMacAddress)
732        ethPkt.set_ether_dhost([0xff,0xff,0xff,0xff,0xff,0xff])
733        ethPkt.set_ether_type(ethernetType)
734        ethPkt.setPktLength(18+structPacket.get_header_size())
735        ethPkt.setNumStructs(1)
736        ethPkt.setSeqNo(0)
737        ethPkt.contains(structPacket)
738        self.writeQueue.put(ethPkt)
739   
740    def write(self):
741        while(reactor.running):
742            #if (not self.writeQueue.empty()):
743            nextItem = self.writeQueue.get()
744            self.pcapObj.inject(nextItem.get_packet())
745            if debugLevel > 0:
746                print "injecting"
747
748class WARPnetEthernetHeader(ImpactPacket.Ethernet):
749    def __init__(self, aBuffer=None):
750        ImpactPacket.Header.__init__(self, 18)
751        if(aBuffer):
752            self.load_header(aBuffer)
753
754    def get_header_size(self):
755        "Return size of Ethernet header"
756        return 18
757       
758    def setPktLength(self, len):
759        self.set_word(14, len)
760   
761    def setNumStructs(self, num):
762        self.set_byte(16, num)
763       
764    def getNumStructs(self):
765        return self.get_byte(16)
766   
767    def setSeqNo(self, num):
768        self.set_byte(17, num)
769   
770    def get_packet(self):
771        return ImpactPacket.Header.get_packet(self)
772
773class WARPnetStruct(ImpactPacket.Header):
774    def __init__(self, aBuffer, fromController):
775        self.fromController = fromController
776        if self.fromController:
777            self.lengthOfPacket = len(aBuffer) + 4
778        else:
779            self.lengthOfPacket = len(aBuffer)
780        ImpactPacket.Header.__init__(self, self.lengthOfPacket)
781        if self.fromController:
782            self.load_header('\x00\x00\x00\x00' + aBuffer)
783        else:
784            self.load_header(aBuffer)
785
786    def setController(self, id):
787        self.set_byte(0, id)
788
789    def getController(self):
790        return self.get_byte(0)
791   
792    def setGroup(self, grp):
793        self.set_byte(1, grp)
794
795    def getGroup(self):
796        return self.get_byte(1)
797       
798    def setAccess(self, level):
799        self.set_byte(2, level)
800
801    def getAccess(self):
802        return self.get_byte(2)
803       
804    def setStruct(self, structid):
805        self.set_byte(4, structid)
806
807    def getStruct(self):
808        return self.get_byte(4)
809   
810    def setNode(self, node):
811        self.set_byte(5, node)
812
813    def getNode(self):
814        return self.get_byte(5)
815
816    def get_header_size(self):
817        return self.lengthOfPacket
818       
819    def get_raw(self):
820        return self.get_packet()[4:self.lengthOfPacket]
821   
822    def printraw(self):
823        for x in range(0,6):
824            print self.get_byte(x)
825
826
827class CmdReader(LineReceiver):
828    from os import linesep as delimiter
829   
830    def lineReceived(self, line):
831        if line == 'r':
832            factory.regManager.disp()
833        elif line == 'l':
834            factory.lockManager.disp()
835        elif line == 'd':
836            factory.controllers.disp()
837        elif line == 'a':
838            factory.regManager.disp()
839            factory.lockManager.disp()
840            factory.controllers.disp()
841            factory.nodeManager.disp()
842        elif line == 'c':
843            print factory.nodeManager.pcapRead.counter
844            print factory.nodeManager.pcapRead.recvdQueue.qsize()
845        elif line == 't':
846            print 'Pcap to Factory: %.6f' % timePcapFactory
847            print 'Pcap to Client: %.6f' % timePcapSend
848        elif line == 'q':
849            reactor.stop()
850            ethPkt = WARPnetEthernetHeader()
851            ethPkt.set_ether_shost(localMacAddress)
852            ethPkt.set_ether_dhost([0xff,0xff,0xff,0xff,0xff,0xff])
853            ethPkt.set_ether_type(ETH_RECEIVE)
854            ethPkt.setPktLength(18)
855            ethPkt.setNumStructs(0)
856            ethPkt.setSeqNo(255)
857            structPkt = WARPnetStruct('123456789ABCDEF', False)
858            structPkt.setController(2)
859            structPkt.setGroup(3)
860            structPkt.setAccess(0)
861            factory.nodeManager.pcapWrite.sendData(structPkt, ETH_RECEIVE)
862
863
864# This function is called when warpnet_server.py is executed as main. This will set up the
865if __name__ == "__main__":
866    import sys
867    if len(sys.argv) == 4:
868        ethInterface = sys.argv[1]
869        serverPort = int(sys.argv[2])
870        mac = sys.argv[3].split(':')
871        for x in range(0, len(mac)):
872            localMacAddress[x] = int('0x'+mac[x], 16)
873    elif len(sys.argv) == 5:
874        ethInterface = sys.argv[1]
875        serverPort = int(sys.argv[2])
876        mac = sys.argv[3].split(':')
877        for x in range(0, len(mac)):
878            localMacAddress[x] = int('0x'+mac[x], 16)
879        debugLevel = sys.argv[4]
880    else:
881        sys.exit()
882
883
884# Create a stdio listener object
885stdio.StandardIO(CmdReader())
886
887# Create a WARPnetFactory class that listens on the input server port and any interface.
888factory = WARPnetFactory()
889reactor.listenTCP(serverPort, factory)
890
891print "Started WARPnet Server. Listening on port %d." % serverPort
892
893# Start the event loop
894reactor.run()
Note: See TracBrowser for help on using the repository browser.