mirror of
				https://github.com/python/cpython.git
				synced 2025-10-26 16:27:06 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			534 lines
		
	
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable file
		
	
	
	
	
			
		
		
	
	
			534 lines
		
	
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable file
		
	
	
	
	
| import fcntl
 | |
| import IOCTL
 | |
| from IOCTL import *
 | |
| import sys
 | |
| import struct
 | |
| import select
 | |
| import posix
 | |
| import time
 | |
| 
 | |
| DEVICE='/dev/ttyd2'
 | |
| 
 | |
| class UnixFile:
 | |
| 	def open(self, name, mode):
 | |
| 		self.fd = posix.open(name, mode)
 | |
| 		return self
 | |
| 
 | |
| 	def read(self, len):
 | |
| 		return posix.read(self.fd, len)
 | |
| 
 | |
| 	def write(self, data):
 | |
| 		dummy = posix.write(self.fd, data)
 | |
| 
 | |
| 	def flush(self):
 | |
| 		pass
 | |
| 
 | |
| 	def fileno(self):
 | |
| 		return self.fd
 | |
| 
 | |
| 	def close(self):
 | |
| 		dummy = posix.close(self.fd)
 | |
| 
 | |
| def packttyargs(*args):
 | |
| 	if type(args) <> type(()):
 | |
| 		raise 'Incorrect argtype for packttyargs'
 | |
| 	if type(args[0]) == type(1):
 | |
| 		iflag, oflag, cflag, lflag, line, chars = args
 | |
| 	elif type(args[0]) == type(()):
 | |
| 		if len(args) <> 1:
 | |
| 			raise 'Only 1 argument expected'
 | |
| 		iflag, oflag, cflag, lflag, line, chars = args[0]
 | |
| 	elif type(args[0]) == type([]):
 | |
| 		if len(args) <> 1:
 | |
| 			raise 'Only 1 argument expected'
 | |
| 		[iflag, oflag, cflag, lflag, line, chars] = args[0]
 | |
| 	str = struct.pack('hhhhb', iflag, oflag, cflag, lflag, line)
 | |
| 	for c in chars:
 | |
| 		str = str + c
 | |
| 	return str
 | |
| 
 | |
| def nullttyargs():
 | |
| 	chars = ['\0']*IOCTL.NCCS
 | |
| 	return packttyargs(0, 0, 0, 0, 0, chars)
 | |
| 
 | |
| def unpackttyargs(str):
 | |
| 	args = str[:-IOCTL.NCCS]
 | |
| 	rawchars = str[-IOCTL.NCCS:]
 | |
| 	chars = []
 | |
| 	for c in rawchars:
 | |
| 		chars.append(c)
 | |
| 	iflag, oflag, cflag, lflag, line = struct.unpack('hhhhb', args)
 | |
| 	return (iflag, oflag, cflag, lflag, line, chars)
 | |
| 
 | |
| def initline(name):
 | |
| 	fp = UnixFile().open(name, 2)
 | |
| 	ofp = fp
 | |
| 	fd = fp.fileno()
 | |
| 	rv = fcntl.ioctl(fd, IOCTL.TCGETA, nullttyargs())
 | |
| 	iflag, oflag, cflag, lflag, line, chars = unpackttyargs(rv)
 | |
| 	iflag = iflag & ~(INPCK|ISTRIP|INLCR|IUCLC|IXON|IXOFF)
 | |
| 	oflag = oflag & ~OPOST
 | |
| 	cflag = B9600|CS8|CREAD|CLOCAL
 | |
| 	lflag = lflag & ~(ISIG|ICANON|ECHO|TOSTOP)
 | |
| 	chars[VMIN] = chr(1)
 | |
| 	chars[VTIME] = chr(0)
 | |
| 	arg = packttyargs(iflag, oflag, cflag, lflag, line, chars)
 | |
| 	dummy = fcntl.ioctl(fd, IOCTL.TCSETA, arg)
 | |
| 	return fp, ofp
 | |
| 
 | |
| #
 | |
| #
 | |
| error = 'VCR.error'
 | |
| 
 | |
| # Commands/replies:
 | |
| COMPLETION = '\x01'
 | |
| ACK  ='\x0a'
 | |
| NAK  ='\x0b'
 | |
| 
 | |
| NUMBER_N = 0x30
 | |
| ENTER  = '\x40'
 | |
| 
 | |
| EXP_7= '\xde'
 | |
| EXP_8= '\xdf'
 | |
| 
 | |
| CL   ='\x56'
 | |
| CTRL_ENABLE = EXP_8 + '\xc6'
 | |
| SEARCH_DATA = EXP_8 + '\x93'
 | |
| ADDR_SENSE = '\x60'
 | |
| 
 | |
| PLAY ='\x3a'
 | |
| STOP ='\x3f'
 | |
| EJECT='\x2a'
 | |
| FF   ='\xab'
 | |
| REW  ='\xac'
 | |
| STILL='\x4f'
 | |
| STEP_FWD ='\x2b'    # Was: '\xad'
 | |
| FM_SELECT=EXP_8 + '\xc8'
 | |
| FM_STILL=EXP_8 + '\xcd'
 | |
| PREVIEW=EXP_7 + '\x9d'
 | |
| REVIEW=EXP_7 + '\x9e'
 | |
| DM_OFF=EXP_8 + '\xc9'
 | |
| DM_SET=EXP_8 + '\xc4'
 | |
| FWD_SHUTTLE='\xb5'
 | |
| REV_SHUTTLE='\xb6'
 | |
| EM_SELECT=EXP_8 + '\xc0'
 | |
| N_FRAME_REC=EXP_8 + '\x92'
 | |
| SEARCH_PREROLL=EXP_8 + '\x90'
 | |
| EDIT_PB_STANDBY=EXP_8 + '\x96'
 | |
| EDIT_PLAY=EXP_8 + '\x98'
 | |
| AUTO_EDIT=EXP_7 + '\x9c'
 | |
| 
 | |
| IN_ENTRY=EXP_7 + '\x90'
 | |
| IN_ENTRY_RESET=EXP_7 + '\x91'
 | |
| IN_ENTRY_SET=EXP_7 + '\x98'
 | |
| IN_ENTRY_INC=EXP_7 + '\x94'
 | |
| IN_ENTRY_DEC=EXP_7 + '\x95'
 | |
| IN_ENTRY_SENSE=EXP_7 + '\x9a'
 | |
| 
 | |
| OUT_ENTRY=EXP_7 + '\x92'
 | |
| OUT_ENTRY_RESET=EXP_7 + '\x93'
 | |
| OUT_ENTRY_SET=EXP_7 + '\x99'
 | |
| OUT_ENTRY_INC=EXP_7 + '\x96'
 | |
| OUT_ENTRY_DEC=EXP_7 + '\x98'
 | |
| OUT_ENTRY_SENSE=EXP_7 + '\x9b'
 | |
| 
 | |
| MUTE_AUDIO = '\x24'
 | |
| MUTE_AUDIO_OFF = '\x25'
 | |
| MUTE_VIDEO = '\x26'
 | |
| MUTE_VIDEO_OFF = '\x27'
 | |
| MUTE_AV = EXP_7 + '\xc6'
 | |
| MUTE_AV_OFF = EXP_7 + '\xc7'
 | |
| 
 | |
| DEBUG=0
 | |
| 
 | |
| class VCR:
 | |
| 	def __init__(self):
 | |
| 		self.ifp, self.ofp = initline(DEVICE)
 | |
| 		self.busy_cmd = None
 | |
| 		self.async = 0
 | |
| 		self.cb = None
 | |
| 		self.cb_arg = None
 | |
| 
 | |
| 	def _check(self):
 | |
| 		if self.busy_cmd:
 | |
| 			raise error, 'Another command active: '+self.busy_cmd
 | |
| 
 | |
| 	def _endlongcmd(self, name):
 | |
| 		if not self.async:
 | |
| 			self.waitready()
 | |
| 			return 1
 | |
| 		self.busy_cmd = name
 | |
| 		return 'VCR BUSY'
 | |
| 
 | |
| 	def fileno(self):
 | |
| 		return self.ifp.fileno()
 | |
| 
 | |
| 	def setasync(self, async):
 | |
| 		self.async = async
 | |
| 
 | |
| 	def setcallback(self, cb, arg):
 | |
| 		self.setasync(1)
 | |
| 		self.cb = cb
 | |
| 		self.cb_arg = arg
 | |
| 
 | |
| 	def poll(self):
 | |
| 		if not self.async:
 | |
| 			raise error, 'Can only call poll() in async mode'
 | |
| 		if not self.busy_cmd:
 | |
| 			return
 | |
| 		if self.testready():
 | |
| 			if self.cb:
 | |
| 				apply(self.cb, (self.cb_arg,))
 | |
| 
 | |
| 	def _cmd(self, cmd):
 | |
| 		if DEBUG:
 | |
| 			print '>>>',`cmd`
 | |
| 		self.ofp.write(cmd)
 | |
| 		self.ofp.flush()
 | |
| 
 | |
| 	def _waitdata(self, len, tout):
 | |
| 		rep = ''
 | |
| 		while len > 0:
 | |
| 			if tout == None:
 | |
| 				ready, d1, d2 = select.select( \
 | |
| 					  [self.ifp], [], [])
 | |
| 			else:
 | |
| 				ready, d1, d2 = select.select( \
 | |
| 					  [self.ifp], [], [], tout)
 | |
| 			if ready == []:
 | |
| ##				if rep:
 | |
| ##					print 'FLUSHED:', `rep`
 | |
| 				return None
 | |
| 			data = self.ifp.read(1)
 | |
| 			if DEBUG:
 | |
| 				print '<<<',`data`
 | |
| 			if data == NAK:
 | |
| 				return NAK
 | |
| 			rep = rep + data
 | |
| 			len = len - 1
 | |
| 		return rep
 | |
| 
 | |
| 	def _reply(self, len):
 | |
| 		data = self._waitdata(len, 10)
 | |
| 		if data == None:
 | |
| 			raise error, 'Lost contact with VCR'
 | |
| 		return data
 | |
| 
 | |
| 	def _getnumber(self, len):
 | |
| 		data = self._reply(len)
 | |
| 		number = 0
 | |
| 		for c in data:
 | |
| 			digit = ord(c) - NUMBER_N
 | |
| 			if digit < 0 or digit > 9:
 | |
| 				raise error, 'Non-digit in number'+`c`
 | |
| 			number = number*10 + digit
 | |
| 		return number
 | |
| 
 | |
| 	def _iflush(self):
 | |
| 		dummy = self._waitdata(10000, 0)
 | |
| ##		if dummy:
 | |
| ##			print 'IFLUSH:', dummy
 | |
| 
 | |
| 	def simplecmd(self,cmd):
 | |
| 		self._iflush()
 | |
| 		for ch in cmd:
 | |
| 			self._cmd(ch)
 | |
| 			rep = self._reply(1)
 | |
| 			if rep == NAK:
 | |
| 				return 0
 | |
| 			elif rep <> ACK:
 | |
| 				raise error, 'Unexpected reply:' + `rep`
 | |
| 		return 1
 | |
| 
 | |
| 	def replycmd(self, cmd):
 | |
| 		if not self.simplecmd(cmd[:-1]):
 | |
| 			return 0
 | |
| 		self._cmd(cmd[-1])
 | |
| 
 | |
| 	def _number(self, number, digits):
 | |
| 		if number < 0:
 | |
| 			raise error, 'Unexpected negative number:'+ `number`
 | |
| 		maxnum = pow(10, digits)
 | |
| 		if number > maxnum:
 | |
| 			raise error, 'Number too big'
 | |
| 		while maxnum > 1:
 | |
| 			number = number % maxnum
 | |
| 			maxnum = maxnum / 10
 | |
| 			digit = number / maxnum
 | |
| 			ok = self.simplecmd(chr(NUMBER_N + digit))
 | |
| 			if not ok:
 | |
| 				raise error, 'Error while transmitting number'
 | |
| 
 | |
| 	def initvcr(self, *optarg):
 | |
| 		timeout = None
 | |
| 		if optarg <> ():
 | |
| 			timeout = optarg[0]
 | |
| 			starttime = time.time()
 | |
| 		self.busy_cmd = None
 | |
| 		self._iflush()
 | |
| 		while 1:
 | |
| ##			print 'SENDCL'
 | |
| 			self._cmd(CL)
 | |
| 			rep = self._waitdata(1, 2)
 | |
| ##			print `rep`
 | |
| 			if rep in ( None, CL, NAK ):
 | |
| 				if timeout:
 | |
| 					if time.time() - starttime > timeout:
 | |
| 						raise error, \
 | |
| 							  'No reply from VCR'
 | |
| 				continue
 | |
| 			break
 | |
| 		if rep <> ACK:
 | |
| 			raise error, 'Unexpected reply:' + `rep`
 | |
| 		dummy = self.simplecmd(CTRL_ENABLE)
 | |
| 
 | |
| 	def waitready(self):
 | |
| 		rep = self._waitdata(1, None)
 | |
| 		if rep == None:
 | |
| 			raise error, 'Unexpected None reply from waitdata'
 | |
| 		if rep not in  (COMPLETION, ACK):
 | |
| 			self._iflush()
 | |
| 			raise error, 'Unexpected waitready reply:' + `rep`
 | |
| 		self.busy_cmd = None
 | |
| 
 | |
| 	def testready(self):
 | |
| 		rep = self._waitdata(1, 0)
 | |
| 		if rep == None:
 | |
| 			return 0
 | |
| 		if rep not in  (COMPLETION, ACK):
 | |
| 			self._iflush()
 | |
| 			raise error, 'Unexpected waitready reply:' + `rep`
 | |
| 		self.busy_cmd = None
 | |
| 		return 1
 | |
| 
 | |
| 	def play(self): return self.simplecmd(PLAY)
 | |
| 	def stop(self): return self.simplecmd(STOP)
 | |
| 	def ff(self):   return self.simplecmd(FF)
 | |
| 	def rew(self):  return self.simplecmd(REW)
 | |
| 	def eject(self):return self.simplecmd(EJECT)
 | |
| 	def still(self):return self.simplecmd(STILL)
 | |
| 	def step(self): return self.simplecmd(STEP_FWD)
 | |
| 
 | |
| 	def goto(self, (h, m, s, f)):
 | |
| 		if not self.simplecmd(SEARCH_DATA):
 | |
| 			return 0
 | |
| 		self._number(h, 2)
 | |
| 		self._number(m, 2)
 | |
| 		self._number(s, 2)
 | |
| 		self._number(f, 2)
 | |
| 		if not self.simplecmd(ENTER):
 | |
| 			return 0
 | |
| 		return self._endlongcmd('goto')
 | |
| 
 | |
| 	# XXXX TC_SENSE doesn't seem to work
 | |
| 	def faulty_where(self):
 | |
| 		self._check()
 | |
| 		self._cmd(TC_SENSE)
 | |
| 		h = self._getnumber(2)
 | |
| 		m = self._getnumber(2)
 | |
| 		s = self._getnumber(2)
 | |
| 		f = self._getnumber(2)
 | |
| 		return (h, m, s, f)
 | |
| 
 | |
| 	def where(self):
 | |
| 		return self.addr2tc(self.sense())
 | |
| 
 | |
| 	def sense(self):
 | |
| 		self._check()
 | |
| 		self._cmd(ADDR_SENSE)
 | |
| 		num = self._getnumber(5)
 | |
| 		return num
 | |
| 
 | |
| 	def addr2tc(self, num):
 | |
| 		f = num % 25
 | |
| 		num = num / 25
 | |
| 		s = num % 60
 | |
| 		num = num / 60
 | |
| 		m = num % 60
 | |
| 		h = num / 60
 | |
| 		return (h, m, s, f)
 | |
| 
 | |
| 	def tc2addr(self, (h, m, s, f)):
 | |
| 		return ((h*60 + m)*60 + s)*25 + f
 | |
| 
 | |
| 	def fmmode(self, mode):
 | |
| 		self._check()
 | |
| 		if mode == 'off':
 | |
| 			arg = 0
 | |
| 		elif mode == 'buffer':
 | |
| 			arg = 1
 | |
| 		elif mode == 'dnr':
 | |
| 			arg = 2
 | |
| 		else:
 | |
| 			raise error, 'fmmode arg should be off, buffer or dnr'
 | |
| 		if not self.simplecmd(FM_SELECT):
 | |
| 			return 0
 | |
| 		self._number(arg, 1)
 | |
| 		if not self.simplecmd(ENTER):
 | |
| 			return 0
 | |
| 		return 1
 | |
| 
 | |
| 	def mute(self, mode, value):
 | |
| 		self._check()
 | |
| 		if mode == 'audio':
 | |
| 			cmds = (MUTE_AUDIO_OFF, MUTE_AUDIO)
 | |
| 		elif mode == 'video':
 | |
| 			cmds = (MUTE_VIDEO_OFF, MUTE_VIDEO)
 | |
| 		elif mode == 'av':
 | |
| 			cmds = (MUTE_AV_OFF, MUTE_AV)
 | |
| 		else:
 | |
| 			raise error, 'mute type should be audio, video or av'
 | |
| 		cmd = cmds[value]
 | |
| 		return self.simplecmd(cmd)
 | |
| 
 | |
| 	def editmode(self, mode):
 | |
| 		self._check()
 | |
| 		if mode == 'off':
 | |
| 			a0 = a1 = a2 = 0
 | |
| 		elif mode == 'format':
 | |
| 			a0 = 4
 | |
| 			a1 = 7
 | |
| 			a2 = 4
 | |
| 		elif mode == 'asmbl':
 | |
| 			a0 = 1
 | |
| 			a1 = 7
 | |
| 			a2 = 4
 | |
| 		elif mode == 'insert-video':
 | |
| 			a0 = 2
 | |
| 			a1 = 4
 | |
| 			a2 = 0
 | |
| 		else:
 | |
| 			raise 'editmode should be off,format,asmbl or insert-video'
 | |
| 		if not self.simplecmd(EM_SELECT):
 | |
| 			return 0
 | |
| 		self._number(a0, 1)
 | |
| 		self._number(a1, 1)
 | |
| 		self._number(a2, 1)
 | |
| 		if not self.simplecmd(ENTER):
 | |
| 			return 0
 | |
| 		return 1
 | |
| 
 | |
| 	def autoedit(self):
 | |
| 		self._check()
 | |
| 		return self._endlongcmd(AUTO_EDIT)
 | |
| 
 | |
| 	def nframerec(self, num):
 | |
| 		if not self.simplecmd(N_FRAME_REC):
 | |
| 			return 0
 | |
| 		self._number(num, 4)
 | |
| 		if not self.simplecmd(ENTER):
 | |
| 			return 0
 | |
| 		return self._endlongcmd('nframerec')
 | |
| 
 | |
| 	def fmstill(self):
 | |
| 		if not self.simplecmd(FM_STILL):
 | |
| 			return 0
 | |
| 		return self._endlongcmd('fmstill')
 | |
| 
 | |
| 	def preview(self):
 | |
| 		if not self.simplecmd(PREVIEW):
 | |
| 			return 0
 | |
| 		return self._endlongcmd('preview')
 | |
| 
 | |
| 	def review(self):
 | |
| 		if not self.simplecmd(REVIEW):
 | |
| 			return 0
 | |
| 		return self._endlongcmd('review')
 | |
| 
 | |
| 	def search_preroll(self):
 | |
| 		if not self.simplecmd(SEARCH_PREROLL):
 | |
| 			return 0
 | |
| 		return self._endlongcmd('search_preroll')
 | |
| 
 | |
| 	def edit_pb_standby(self):
 | |
| 		if not self.simplecmd(EDIT_PB_STANDBY):
 | |
| 			return 0
 | |
| 		return self._endlongcmd('edit_pb_standby')
 | |
| 
 | |
| 	def edit_play(self):
 | |
| 		if not self.simplecmd(EDIT_PLAY):
 | |
| 			return 0
 | |
| 		return self._endlongcmd('edit_play')
 | |
| 
 | |
| 	def dmcontrol(self, mode):
 | |
| 		self._check()
 | |
| 		if mode == 'off':
 | |
| 			return self.simplecmd(DM_OFF)
 | |
| 		if mode == 'multi freeze':
 | |
| 			num = 1000
 | |
| 		elif mode == 'zoom freeze':
 | |
| 			num = 2000
 | |
| 		elif mode == 'digital slow':
 | |
| 			num = 3000
 | |
| 		elif mode == 'freeze':
 | |
| 			num = 4011
 | |
| 		else:
 | |
| 			raise error, 'unknown dmcontrol argument: ' + `mode`
 | |
| 		if not self.simplecmd(DM_SET):
 | |
| 			return 0
 | |
| 		self._number(num, 4)
 | |
| 		if not self.simplecmd(ENTER):
 | |
| 			return 0
 | |
| 		return 1
 | |
| 
 | |
| 	def fwdshuttle(self, num):
 | |
| 		if not self.simplecmd(FWD_SHUTTLE):
 | |
| 			return 0
 | |
| 		self._number(num, 1)
 | |
| 		return 1
 | |
| 
 | |
| 	def revshuttle(self, num):
 | |
| 		if not self.simplecmd(REV_SHUTTLE):
 | |
| 			return 0
 | |
| 		self._number(num, 1)
 | |
| 		return 1
 | |
| 
 | |
| 	def getentry(self, which):
 | |
| 		self._check()
 | |
| 		if which == 'in':
 | |
| 			cmd = IN_ENTRY_SENSE
 | |
| 		elif which == 'out':
 | |
| 			cmd = OUT_ENTRY_SENSE
 | |
| 		self.replycmd(cmd)
 | |
| 		h = self._getnumber(2)
 | |
| 		m = self._getnumber(2)
 | |
| 		s = self._getnumber(2)
 | |
| 		f = self._getnumber(2)
 | |
| 		return (h, m, s, f)
 | |
| 
 | |
| 	def inentry(self, arg):
 | |
| 		return self.ioentry(arg, (IN_ENTRY, IN_ENTRY_RESET, \
 | |
| 			  IN_ENTRY_SET, IN_ENTRY_INC, IN_ENTRY_DEC))
 | |
| 
 | |
| 	def outentry(self, arg):
 | |
| 		return self.ioentry(arg, (OUT_ENTRY, OUT_ENTRY_RESET, \
 | |
| 			  OUT_ENTRY_SET, OUT_ENTRY_INC, OUT_ENTRY_DEC))
 | |
| 
 | |
| 	def ioentry(self, arg, (Load, Clear, Set, Inc, Dec)):
 | |
| 		self._check()
 | |
| 		if type(arg) == type(()):
 | |
| 			h, m, s, f = arg
 | |
| 			if not self.simplecmd(Set):
 | |
| 				return 0
 | |
| 			self._number(h,2)
 | |
| 			self._number(m,2)
 | |
| 			self._number(s,2)
 | |
| 			self._number(f,2)
 | |
| 			if not self.simplecmd(ENTER):
 | |
| 				return 0
 | |
| 			return 1
 | |
| 		elif arg == 'reset':
 | |
| 			cmd = Clear
 | |
| 		elif arg == 'load':
 | |
| 			cmd = Load
 | |
| 		elif arg == '+':
 | |
| 			cmd = Inc
 | |
| 		elif arg == '-':
 | |
| 			cmd = Dec
 | |
| 		else:
 | |
| 			raise error, 'Arg should be +,-,reset,load or (h,m,s,f)'
 | |
| 		return self.simplecmd(cmd)
 | |
| 
 | |
| 	def cancel(self):
 | |
| 		d = self.simplecmd(CL)
 | |
| 		self.busy_cmd = None
 | 
