[6320] | 1 | # -*- coding: utf-8 -*- |
---|
| 2 | """ |
---|
| 3 | ------------------------------------------------------------------------------ |
---|
| 4 | Mango 802.11 Reference Design Experiments Framework |
---|
| 5 | - Programmable Attenuator Control |
---|
| 6 | ------------------------------------------------------------------------------ |
---|
| 7 | License: Copyright 2019 Mango Communications, Inc. All rights reserved. |
---|
| 8 | Use and distribution subject to terms in LICENSE.txt |
---|
| 9 | ------------------------------------------------------------------------------ |
---|
| 10 | |
---|
| 11 | """ |
---|
| 12 | |
---|
| 13 | class ProgAttenController(object): |
---|
| 14 | atten_dev = None |
---|
| 15 | |
---|
| 16 | def __init__(self, serial_port=None, serial_num=None): |
---|
| 17 | import serial |
---|
| 18 | |
---|
| 19 | ser_port = self.find_atten_dev_port(serial_port, serial_num) |
---|
| 20 | |
---|
| 21 | if(ser_port is not None): |
---|
| 22 | self.atten_dev = serial.Serial(port=ser_port, baudrate=115200, timeout=0.25) |
---|
| 23 | |
---|
| 24 | # Disable console mode (prevents output echoing input) |
---|
| 25 | self.write_cmd('CONSOLE DISABLE') |
---|
| 26 | |
---|
| 27 | # Reset the attenuator state and clear input/output buffers |
---|
| 28 | self.write_cmd('*RST') |
---|
| 29 | self.write_cmd('*CLS') |
---|
| 30 | |
---|
| 31 | return |
---|
| 32 | else: |
---|
| 33 | raise Exception('ERROR: No programmable attenuator found - check your USB connections and driver!') |
---|
| 34 | |
---|
| 35 | |
---|
| 36 | def find_atten_dev_port(self, serial_port=None, serial_num=None): |
---|
| 37 | from serial.tools.list_ports import comports |
---|
| 38 | import re |
---|
| 39 | |
---|
| 40 | # comports returns iterable of 3-tuples: |
---|
| 41 | # 0: Port name ('COMX' on windows, ??? on OS X) |
---|
| 42 | # 1: Device description |
---|
| 43 | # 2: Device info |
---|
| 44 | |
---|
| 45 | # We're looking for: (where X in COMX is arbitrary integer) |
---|
| 46 | # ('COMX', 'Weinschel 4205 USB COM Port (COMX)', 'USB VID:PID=25EA:106D SNR=000000'), |
---|
| 47 | |
---|
| 48 | for (port_name, dev_desc, dev_info) in comports(): |
---|
| 49 | # Apply port name filter, if specified |
---|
| 50 | if(serial_port is not None and port_name != serial_port): |
---|
| 51 | # Keep looking |
---|
| 52 | continue |
---|
| 53 | |
---|
| 54 | desc_test = dev_desc.split('Weinschel 4205 USB COM Port') |
---|
| 55 | # desc_test = dev_desc.split('Weinschel USB COM Port') |
---|
| 56 | info_test = re.search('USB VID:PID=(....):(....) SNR=(.+)', dev_info) |
---|
| 57 | # info_test = re.search('USB VID:PID=(....):(....) SER=(.+)', dev_info) |
---|
| 58 | |
---|
| 59 | |
---|
| 60 | if(info_test): |
---|
| 61 | info_test = info_test.groups() |
---|
| 62 | else: |
---|
| 63 | info_test = [] |
---|
| 64 | |
---|
| 65 | if(desc_test[0] == '' and len(info_test) == 3): |
---|
| 66 | # Found an attenuator |
---|
| 67 | |
---|
| 68 | # Check the serial number, if specified |
---|
| 69 | if(serial_num is not None and serial_num != info_test[2]): |
---|
| 70 | # Keep looking |
---|
| 71 | continue |
---|
| 72 | |
---|
| 73 | # Future extension: return serial number too |
---|
| 74 | # ser_num = info_test[2] #keep as string; device supports alphanumeric ser nums |
---|
| 75 | print('Found attenuator on {0} ({1} {2})'.format(port_name, dev_desc, dev_info)) |
---|
| 76 | return port_name |
---|
| 77 | |
---|
| 78 | # Did not find attenuator |
---|
| 79 | return None |
---|
| 80 | |
---|
| 81 | |
---|
| 82 | def read(self): |
---|
| 83 | return self.atten_dev.read(200) |
---|
| 84 | |
---|
| 85 | |
---|
| 86 | def readline(self): |
---|
| 87 | return self.atten_dev.readline() |
---|
| 88 | |
---|
| 89 | |
---|
| 90 | def write(self, s): |
---|
| 91 | return self.atten_dev.write(s) |
---|
| 92 | |
---|
| 93 | |
---|
| 94 | def write_cmd(self, cmd): |
---|
| 95 | # Strip any stray line breaks, append one CR |
---|
| 96 | cmd = ('{0}\n').format(str(cmd).replace('\n', '').replace('\r', '')) |
---|
| 97 | |
---|
| 98 | # Write command to serial port |
---|
| 99 | stat = self.atten_dev.write(cmd) |
---|
| 100 | |
---|
| 101 | # Check that full command was written |
---|
| 102 | if(stat != len(cmd)): |
---|
| 103 | raise Exception('ERROR: Failed to write full command to serial port ({0} < {1})!'.format(stat, len(cmd))) |
---|
| 104 | |
---|
| 105 | |
---|
| 106 | def read_resp(self): |
---|
| 107 | # Attenuator responses are terminated by newlines |
---|
| 108 | # It also replies with line breaks to (some?) commands, even when CONSOLE is off |
---|
| 109 | |
---|
| 110 | # Loop until: |
---|
| 111 | # Actual response is received (more than just line breaks) |
---|
| 112 | # serial.readline() times out (indicated by zero-length return) |
---|
| 113 | while True: |
---|
| 114 | r = self.readline() |
---|
| 115 | if(len(r) > 0): |
---|
| 116 | #Check if response is just line breaks |
---|
| 117 | r = r.replace('\n', '').replace('\r', '') |
---|
| 118 | if(len(r) == 0): |
---|
| 119 | continue |
---|
| 120 | else: |
---|
| 121 | #Got valid (more than line breaks) response |
---|
| 122 | return r |
---|
| 123 | else: |
---|
| 124 | return None |
---|
| 125 | |
---|
| 126 | |
---|
| 127 | def set_atten(self, atten): |
---|
| 128 | # Round user supplied attenuation to nearest 0.5 dB in [0.0, 95.5] |
---|
| 129 | atten_actual = float(atten) |
---|
| 130 | |
---|
| 131 | # Constrain to supported attenuation range |
---|
| 132 | if(atten_actual > 95.5): |
---|
| 133 | atten_actual = 95.5 |
---|
| 134 | elif(atten_actual < 0): |
---|
| 135 | atten_actual = 0.0 |
---|
| 136 | |
---|
| 137 | # Round to nearest 0.1 dB |
---|
| 138 | atten_actual = round(float(atten_actual), 1) |
---|
| 139 | |
---|
| 140 | atten_str = '{0:2.1f}'.format(atten_actual) |
---|
| 141 | |
---|
| 142 | num_attempts = 5 |
---|
| 143 | |
---|
| 144 | while num_attempts >= 0: |
---|
| 145 | # Send command to attenuator |
---|
| 146 | self.write_cmd('ATTN ' + atten_str) |
---|
| 147 | |
---|
| 148 | # Read back the attenuation |
---|
| 149 | self.write_cmd('ATTN?') |
---|
| 150 | r = self.read_resp() |
---|
| 151 | |
---|
| 152 | if(str(r) == str(atten_str)): |
---|
| 153 | return |
---|
| 154 | else: |
---|
| 155 | num_attempts -= 1 |
---|
| 156 | |
---|
| 157 | raise Exception('ERROR: failed to set attenuation ({0} != {1})'.format(atten_str, r)) |
---|
| 158 | |
---|
| 159 | |
---|
| 160 | def close(self): |
---|
| 161 | self.atten_dev.close() |
---|
| 162 | |
---|
| 163 | # End class |
---|