123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223 |
- #!/usr/bin/python3
- import logging
- import serial
- import argparse
- import sys
- import os
- import time
- logging.basicConfig(format='%(name)s: %(levelname)s: %(message)s', level=logging.INFO)
- logger = logging.getLogger('SA828')
- CTCSSCDCSS_CODES = [
- "None", "67.0", "71.9", "74.4", "77.0", "79.7", "82.5", "85.4", "88.5",
- "91.5", "94.8", "97.4", "100.0", "103.5", "107.2", "110.9", "114.8", "118.8",
- "123.0", "127.3", "131.8", "136.5", "141.3", "146.2", "151.4", "156.7",
- "162.2", "167.9", "173.8", "179.9", "186.2", "192.8", "203.5", "210.7",
- "218.1", "225.7", "233.6", "241.8", "250.3",
- "023I", "025I", "026I", "031I", "032I", "036I", "043I", "047I", "051I", "053I", "054I",
- "065I", "071I", "072I", "073I", "074I", "114I", "115I", "116I", "125I", "131I", "132I",
- "134I", "143I", "152I", "155I", "156I", "162I", "165I", "172I", "174I", "205I", "223I",
- "226I", "243I", "244I", "245I", "251I", "261I", "263I", "265I", "271I", "306I", "311I",
- "315I", "331I", "343I", "346I", "351I", "364I", "365I", "371I", "411I", "412I", "413I",
- "423I", "431I", "432I", "445I", "464I", "465I", "466I", "503I", "506I", "516I", "532I",
- "546I", "565I", "606I", "612I", "624I", "627I", "631I", "632I", "654I", "662I", "664I",
- "703I", "712I", "723I", "731I", "732I", "734I", "743I", "754I",
- "023I", "025I", "026I", "031I", "032I", "036I", "043I", "047I", "051I", "053I", "054I",
- "065I", "071I", "072I", "073I", "074I", "114I", "115I", "116I", "125I", "131I", "132I",
- "134I", "143I", "152I", "155I", "156I", "162I", "165I", "172I", "174I", "205I", "223I",
- "226I", "243I", "244I", "245I", "251I", "261I", "263I", "265I", "271I", "306I", "311I",
- "315I", "331I", "343I", "346I", "351I", "364I", "365I", "371I", "411I", "412I", "413I",
- "423I", "431I", "432I", "445I", "464I", "465I", "466I", "503I", "506I", "516I", "532I",
- "546I", "565I", "606I", "612I", "624I", "627I", "631I", "632I", "654I", "662I", "664I",
- "703I", "712I", "723I", "731I", "732I", "734I", "743I", "754I"
- ]
- DEFAULT_BAUDRATE = 9600
- BAUD_RATES = [300, 1200, 2400, 4800, 9600, 19200, 38400, 57600, 115200]
- class SA828:
- EOL = "\r\n"
- RETOK = "OK\r\n"
- RETERROR = "ERROR\r\n"
- READ_TIMEOUT = 3
- def __init__(self, port=None, baud=DEFAULT_BAUDRATE):
- self.serial = None
- try:
- self.serial = serial.Serial(port=port,
- baudrate=baud,
- parity=serial.PARITY_NONE,
- stopbits=serial.STOPBITS_ONE,
- bytesize=serial.EIGHTBITS,
- timeout=self.READ_TIMEOUT)
- logger.debug(self.serial)
- except serial.SerialException as err:
- logger.debug(err)
- if not isinstance(self.serial, serial.Serial):
- raise IOError('Error openning the serial port')
- def close(self):
- self.serial.close()
- def send(self, *args):
- data = ','.join(args)
- logger.debug('Sending: %s', data)
- data = bytes(data + self.EOL, 'ascii')
- try:
- self.serial.write(data)
- except serial.SerialException as err:
- logger.error(err)
- def readline(self):
- try:
- line = self.serial.readline()
- except serial.SerialException as err:
- logger.warning(err)
- return None
- try:
- line = line.decode('ascii')
- except UnicodeDecodeError:
- logger.debug(line)
- logger.error('Character decode error: Check your baudrate')
- logger.debug('Received: %s', line.rstrip())
- return line.rstrip()
- def version(self):
- self.send("\x41\x41\x46\x41\x41")
- time.sleep(0.5)
- reply = self.readline()
- return reply
- def read_parameter(self):
- self.send("AAFA1")
- time.sleep(0.5)
- reply = self.readline()
- return reply
- def reset(self):
- self.send("AAFA2")
- time.sleep(0.5)
- reply = self.readline()
- return reply
- def set_parameter(self, data):
- self.send("AAFA3" + data)
- time.sleep(3)
- reply = self.readline()
- return reply
- def main():
-
- parser = argparse.ArgumentParser(description="generate configuration for switch port")
- parser.add_argument("--debug", action="store_true", default=False)
- parser.add_argument("--port", type=str, default="/dev/ttyUSB0", help="Serial port [default: linux console port]")
- parser.add_argument('--speed', type=int, choices=BAUD_RATES, default=DEFAULT_BAUDRATE, help="Connection speed")
- subparsers = parser.add_subparsers()
- p_readversion = subparsers.add_parser("version", help='Read the version of the radio')
- p_readversion.set_defaults(func="version")
- p_reset = subparsers.add_parser("reset", help='Reset the radio')
- p_reset.set_defaults(func="reset")
- p_readparameter = subparsers.add_parser("readparameter", help='Read the full configuration of the radio')
- p_readparameter.set_defaults(func="readparameter")
- p_setparameter = subparsers.add_parser("setparameter", help='Set the configuration of the radio')
- p_setparameter.set_defaults(func="setparameter")
- p_setparameter.add_argument("--channel", required=True, help="Channel number, [1-16]")
- p_setparameter.add_argument("--txfreq",required=True, help="Transmit frequency : format '145.4500'")
- p_setparameter.add_argument("--rxfreq",required=True, help="Receive frequency : format '145.4500'")
- p_setparameter.add_argument("--txctcss",required=True, help="Transmit CTCSS : format 'None' or '88.5' or 023I or 023N")
- p_setparameter.add_argument("--rxctcss",required=True, help="Receive CTCSS : format 'None' or '88.5' or 023I or 023N")
- p_setparameter.add_argument("--squelch",required=True, help="Set the Squelch : [0-8]")
- opts = parser.parse_args()
- if not hasattr(opts, 'func'):
- print('sa818: error: the following arguments are required: {radio,volume,filters,version}\n'
- 'use --help for more informatiion',
- file=sys.stderr)
- sys.exit(os.EX_USAGE)
- try:
- radio = SA828(opts.port, opts.speed)
- except (IOError, SystemError) as err:
- logger.error(err)
- sys.exit(os.EX_IOERR)
- if opts.debug:
- logger.setLevel(logging.DEBUG)
- logger.debug(opts)
- if opts.func == 'version':
- logger.info('Version: %s', radio.version())
- if opts.func == 'reset':
- logger.info('Resetting the radio')
- radio.reset()
- if opts.func == 'readparameter':
- params=radio.read_parameter().split(",")
- params[0]=params[0][2:]
- print("Configuration:")
- print("==============")
- for i in range(0, 16):
- if i == 0:
- print("Channel %s:\tTx : %s\tRx : %s" % (i+1, params[i], params[i+1]))
- else:
- print("Channel %s:\tTx : %s\tRx : %s" % (i+1, params[i*2], params[i*2+1]))
- print("CTCS/CDCS :\tTx : %s\t Rx : %s" % (CTCSSCDCSS_CODES[int(params[32])], CTCSSCDCSS_CODES[int(params[33])]))
- print("Squelch :\t%s" % params[34])
- if opts.func == 'setparameter':
- logger.info('Setting the radio')
- params=radio.read_parameter().split(",")
- params[0]=params[0][2:]
- channel = {}
- for i in range(0, 16):
- if i == 0:
- channel[i+1]=[params[i],params[i+1]]
- else:
- channel[i+1]=[params[i*2],params[i*2+1]]
- channel[int(opts.channel)]=[opts.txfreq,opts.rxfreq]
- # Creation de la chaine complete
- for i in range(0, 16):
- if i == 0:
- tosend="%s,%s," % (channel[i+1][0],channel[i+1][1])
- else:
- tosend=tosend+"%s,%s," % (channel[i+1][0],channel[i+1][1])
- #search value in array and return position
- for i in range(0,len(CTCSSCDCSS_CODES)):
- if CTCSSCDCSS_CODES[i] == opts.txctcss:
- txctcss=i
- break
- else:
- txctcss=0
- for i in range(0,len(CTCSSCDCSS_CODES)):
- if CTCSSCDCSS_CODES[i] == opts.rxctcss:
- rxctcss=i
- break
- else:
- rxctcss=0
- tosend=tosend+"%03d,%03d,%s" % (txctcss,rxctcss,opts.squelch)
- radio.set_parameter(tosend)
- if __name__ == "__main__":
- main()
|