193 lines
5.2 KiB
Python
Executable File
193 lines
5.2 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# intellectual property is bullshit bgdc
|
|
|
|
PORT = "/dev/tty.usbserial-2120"
|
|
BAUD = 31250
|
|
|
|
import base64
|
|
import binascii
|
|
import serial
|
|
import sys
|
|
|
|
class DummySerial:
|
|
def __init__(self, *args, **kwargs):
|
|
self.fifo = []
|
|
self.timeout = kwargs.get("timeout")
|
|
pass
|
|
|
|
def write(self, b):
|
|
print(binascii.hexlify(b).decode("ascii"))
|
|
for i in b:
|
|
self.fifo.append(i)
|
|
|
|
def read(self, n):
|
|
data = bytes(self.fifo[:n])
|
|
del self.fifo[:n]
|
|
return data
|
|
|
|
class hex32arg:
|
|
@classmethod
|
|
def encode(cls, cmd):
|
|
return b"%08X" % int(cmd[0], 16)
|
|
@classmethod
|
|
def decode(cls, cmd):
|
|
return (b"0x" + cmd).decode("ascii")
|
|
|
|
class dataarg:
|
|
@classmethod
|
|
def encode(cls, cmd):
|
|
data = binascii.unhexlify(cmd[0])
|
|
cksum = ~sum(data) & 0x7F
|
|
return bytes([cksum]) + base64.b64encode(data)
|
|
@classmethod
|
|
def decode(cls, cmd):
|
|
cksum_received = cmd[0]
|
|
data = base64.b64decode(cmd[1:])
|
|
cksum_actual = ~sum(data) & 0x7F
|
|
|
|
if cksum_received != cksum_actual:
|
|
print("warning: checksum fail")
|
|
|
|
return binascii.hexlify(data).decode("ascii")
|
|
|
|
SYSEX = [
|
|
(["storage", "response"], [0x01, 0x01], dataarg),
|
|
(["storage", "read"], [0x01, 0x02], hex32arg),
|
|
(["storage", "openwr"], [0x01, 0x03], hex32arg),
|
|
(["storage", "wrdata"], [0x01, 0x04], dataarg),
|
|
(["storage", "commit"], [0x01, 0x05], None),
|
|
(["storage", "cancel"], [0x01, 0x06], None),
|
|
|
|
(["status", "ack"], [0x7F, 0x01], None),
|
|
(["status", "fmt_err"], [0x7F, 0x02], None),
|
|
(["status", "unknown"], [0x7F, 0x03], None),
|
|
(["status", "busy"], [0x7F, 0x04], None),
|
|
(["status", "ioerr"], [0x7F, 0x05], None),
|
|
(["status", "cksum"], [0x7F, 0x06], None),
|
|
]
|
|
|
|
HLCOMMANDS = [
|
|
]
|
|
|
|
class Receiver:
|
|
def __init__(self, port):
|
|
self.s = port
|
|
|
|
def rx_one(self, timeout=0.1):
|
|
"""Receive a single MIDI packet. Returns it as bytes. On a timeout,
|
|
whatever was received is returned."""
|
|
|
|
timeout, self.s.timeout = self.s.timeout, timeout
|
|
|
|
buf = b""
|
|
while not buf or not (buf[0] & 0x80):
|
|
byte = self.s.read(1)
|
|
if not byte:
|
|
break
|
|
buf += byte
|
|
|
|
# Terminating conditions depend on the packet type. The following
|
|
# initial bytes specify a three-byte packet:
|
|
# 8x 9x Ax Bx Cx Ex
|
|
#
|
|
# The following specifies a two-byte packet:
|
|
# Dx
|
|
#
|
|
# F0 specifies a sysex packet, terminated by F7
|
|
#
|
|
# Other Fx packets are currently returned immediately after 1 byte.
|
|
|
|
if not buf:
|
|
term = lambda b: True
|
|
elif (buf[0] & 0xF0) in (0x80, 0x90, 0xA0, 0xB0, 0xC0, 0xE0):
|
|
term = lambda b: len(b) == 3
|
|
elif (buf[0] & 0xF0) == 0xD0:
|
|
term = lambda b: len(b) == 2
|
|
elif buf[0] == 0xF0:
|
|
term = lambda b: b[-1] == 0xF7
|
|
else:
|
|
term = lambda b: True
|
|
|
|
while not term(buf):
|
|
byte = self.s.read(1)
|
|
if not byte:
|
|
break
|
|
buf += byte
|
|
|
|
timeout, self.s.timeout = self.s.timeout, timeout
|
|
|
|
return buf
|
|
|
|
def rx_until_status(self, timeout=0.1):
|
|
"""Receive packets, yielding them, until either a "status" response is
|
|
received or the interface times out."""
|
|
|
|
while True:
|
|
packet = self.rx_one(timeout=timeout)
|
|
is_status = packet and packet.startswith(b"\xF0\x7D\x7F")
|
|
yield packet
|
|
if is_status or not packet:
|
|
break
|
|
|
|
def main(argv):
|
|
s = serial.Serial(PORT, BAUD, timeout=0.1)
|
|
#s = DummySerial()
|
|
|
|
rx = Receiver(s)
|
|
|
|
for i in argv[1:]:
|
|
send(s, i)
|
|
|
|
for resp in rx.rx_until_status():
|
|
print(parse_response(resp))
|
|
|
|
def send(s, cmd):
|
|
if cmd.startswith(":"):
|
|
# sysex - split on :
|
|
cmdargs = cmd[1:].split(":")
|
|
|
|
for key, data, converter in SYSEX:
|
|
if cmdargs[:len(key)] == key:
|
|
if converter is not None:
|
|
data += converter.encode(cmdargs[len(key):])
|
|
|
|
s.write(b"\xF0\x7D" + bytes(data) + b"\xF7")
|
|
break
|
|
|
|
else:
|
|
print("error: cannot find command:", cmd)
|
|
|
|
elif cmd.startswith("!"):
|
|
cmdargs = cmd[1:].split(":")
|
|
for name, func in HLCOMMANDS:
|
|
if name == cmdargs[0]:
|
|
func(cmdargs[1:])
|
|
break
|
|
else:
|
|
print("error: cannot find command:", cmdargs[0])
|
|
|
|
def parse_response(r):
|
|
while r and ((r[0] & 0x80) == 0x00):
|
|
r.pop(0)
|
|
|
|
if not r:
|
|
return None
|
|
|
|
if r.startswith(b"\xF0\x7D"):
|
|
end = r.find(b"\xF7")
|
|
if end < 0:
|
|
return None
|
|
r = r[:end]
|
|
|
|
for key, data, converter in SYSEX:
|
|
if r[2:].startswith(bytes(data)):
|
|
arg = r[2 + len(key):]
|
|
if converter is not None:
|
|
arg_decoded = ":" + converter.decode(arg)
|
|
else:
|
|
arg_decoded = ""
|
|
return ":" + ":".join(key) + arg_decoded
|
|
|
|
if __name__ == "__main__":
|
|
main(sys.argv)
|