1 | # -*- coding: utf-8 -*- |
---|
2 | """ |
---|
3 | ------------------------------------------------------------------------------ |
---|
4 | Mango 802.11 Reference Design Experiments Framework |
---|
5 | - Programmable Attenuator Control |
---|
6 | ------------------------------------------------------------------------------ |
---|
7 | License: Copyright 2019 Mango Communications, Inc. All rights reserved. |
---|
8 | Use and distribution subject to terms in LICENSE.txt |
---|
9 | ------------------------------------------------------------------------------ |
---|
10 | |
---|
11 | """ |
---|
12 | |
---|
13 | class ProgAttenController(object): |
---|
14 | atten_dev = None |
---|
15 | |
---|
16 | def __init__(self, serial_port=None, serial_num=None): |
---|
17 | import serial |
---|
18 | |
---|
19 | ser_port = self.find_atten_dev_port(serial_port, serial_num) |
---|
20 | |
---|
21 | if(ser_port is not None): |
---|
22 | self.atten_dev = serial.Serial(port=ser_port, baudrate=115200, timeout=0.25) |
---|
23 | |
---|
24 | # Disable console mode (prevents output echoing input) |
---|
25 | self.write_cmd('CONSOLE DISABLE') |
---|
26 | |
---|
27 | # Reset the attenuator state and clear input/output buffers |
---|
28 | self.write_cmd('*RST') |
---|
29 | self.write_cmd('*CLS') |
---|
30 | |
---|
31 | return |
---|
32 | else: |
---|
33 | raise Exception('ERROR: No programmable attenuator found - check your USB connections and driver!') |
---|
34 | |
---|
35 | |
---|
36 | def find_atten_dev_port(self, serial_port=None, serial_num=None): |
---|
37 | from serial.tools.list_ports import comports |
---|
38 | import re |
---|
39 | |
---|
40 | # comports returns iterable of 3-tuples: |
---|
41 | # 0: Port name ('COMX' on windows, ??? on OS X) |
---|
42 | # 1: Device description |
---|
43 | # 2: Device info |
---|
44 | |
---|
45 | # We're looking for: (where X in COMX is arbitrary integer) |
---|
46 | # ('COMX', 'Weinschel 4205 USB COM Port (COMX)', 'USB VID:PID=25EA:106D SNR=000000'), |
---|
47 | |
---|
48 | for (port_name, dev_desc, dev_info) in comports(): |
---|
49 | # Apply port name filter, if specified |
---|
50 | if(serial_port is not None and port_name != serial_port): |
---|
51 | # Keep looking |
---|
52 | continue |
---|
53 | |
---|
54 | desc_test = dev_desc.split('Weinschel 4205 USB COM Port') |
---|
55 | # desc_test = dev_desc.split('Weinschel USB COM Port') |
---|
56 | info_test = re.search('USB VID:PID=(....):(....) SNR=(.+)', dev_info) |
---|
57 | # info_test = re.search('USB VID:PID=(....):(....) SER=(.+)', dev_info) |
---|
58 | |
---|
59 | |
---|
60 | if(info_test): |
---|
61 | info_test = info_test.groups() |
---|
62 | else: |
---|
63 | info_test = [] |
---|
64 | |
---|
65 | if(desc_test[0] == '' and len(info_test) == 3): |
---|
66 | # Found an attenuator |
---|
67 | |
---|
68 | # Check the serial number, if specified |
---|
69 | if(serial_num is not None and serial_num != info_test[2]): |
---|
70 | # Keep looking |
---|
71 | continue |
---|
72 | |
---|
73 | # Future extension: return serial number too |
---|
74 | # ser_num = info_test[2] #keep as string; device supports alphanumeric ser nums |
---|
75 | print('Found attenuator on {0} ({1} {2})'.format(port_name, dev_desc, dev_info)) |
---|
76 | return port_name |
---|
77 | |
---|
78 | # Did not find attenuator |
---|
79 | return None |
---|
80 | |
---|
81 | |
---|
82 | def read(self): |
---|
83 | return self.atten_dev.read(200) |
---|
84 | |
---|
85 | |
---|
86 | def readline(self): |
---|
87 | return self.atten_dev.readline() |
---|
88 | |
---|
89 | |
---|
90 | def write(self, s): |
---|
91 | return self.atten_dev.write(s) |
---|
92 | |
---|
93 | |
---|
94 | def write_cmd(self, cmd): |
---|
95 | # Strip any stray line breaks, append one CR |
---|
96 | cmd = ('{0}\n').format(str(cmd).replace('\n', '').replace('\r', '')) |
---|
97 | |
---|
98 | # Write command to serial port |
---|
99 | stat = self.atten_dev.write(cmd) |
---|
100 | |
---|
101 | # Check that full command was written |
---|
102 | if(stat != len(cmd)): |
---|
103 | raise Exception('ERROR: Failed to write full command to serial port ({0} < {1})!'.format(stat, len(cmd))) |
---|
104 | |
---|
105 | |
---|
106 | def read_resp(self): |
---|
107 | # Attenuator responses are terminated by newlines |
---|
108 | # It also replies with line breaks to (some?) commands, even when CONSOLE is off |
---|
109 | |
---|
110 | # Loop until: |
---|
111 | # Actual response is received (more than just line breaks) |
---|
112 | # serial.readline() times out (indicated by zero-length return) |
---|
113 | while True: |
---|
114 | r = self.readline() |
---|
115 | if(len(r) > 0): |
---|
116 | #Check if response is just line breaks |
---|
117 | r = r.replace('\n', '').replace('\r', '') |
---|
118 | if(len(r) == 0): |
---|
119 | continue |
---|
120 | else: |
---|
121 | #Got valid (more than line breaks) response |
---|
122 | return r |
---|
123 | else: |
---|
124 | return None |
---|
125 | |
---|
126 | |
---|
127 | def set_atten(self, atten): |
---|
128 | # Round user supplied attenuation to nearest 0.5 dB in [0.0, 95.5] |
---|
129 | atten_actual = float(atten) |
---|
130 | |
---|
131 | # Constrain to supported attenuation range |
---|
132 | if(atten_actual > 95.5): |
---|
133 | atten_actual = 95.5 |
---|
134 | elif(atten_actual < 0): |
---|
135 | atten_actual = 0.0 |
---|
136 | |
---|
137 | # Round to nearest 0.1 dB |
---|
138 | atten_actual = round(float(atten_actual), 1) |
---|
139 | |
---|
140 | atten_str = '{0:2.1f}'.format(atten_actual) |
---|
141 | |
---|
142 | num_attempts = 5 |
---|
143 | |
---|
144 | while num_attempts >= 0: |
---|
145 | # Send command to attenuator |
---|
146 | self.write_cmd('ATTN ' + atten_str) |
---|
147 | |
---|
148 | # Read back the attenuation |
---|
149 | self.write_cmd('ATTN?') |
---|
150 | r = self.read_resp() |
---|
151 | |
---|
152 | if(str(r) == str(atten_str)): |
---|
153 | return |
---|
154 | else: |
---|
155 | num_attempts -= 1 |
---|
156 | |
---|
157 | raise Exception('ERROR: failed to set attenuation ({0} != {1})'.format(atten_str, r)) |
---|
158 | |
---|
159 | |
---|
160 | def close(self): |
---|
161 | self.atten_dev.close() |
---|
162 | |
---|
163 | # End class |
---|