안드로이드에서는 adb getevent를 이용하여 이벤트를 녹화 할 수 있다고 검색결과가 여기저기 나와있었습니다.
흥미로운 사실은 sendevent 를 이용하여 그대로 재생도 할 수 있는것 같아서 아 뭔가 만들수 있겠구나 해서 작업을 시작하였습니다.
http://cafe.naver.com/ruletheskycafe/418610
이벤트 녹화는 두개의 파일을 이용하여 제작 하였고 recevt.py adblibs.py 파일을 사용하며,
이벤트 재생은 playevt.py adblibs.py 파일을 사용하도록 만들었습니다.
스크립트 재생시 반복과 function 처리가 구현할 수 있도록 대충 만들어 90%정도의 완성율을 보여 테스트 해보았습니다.
하지만 결과는 대 실패였습니다
sendevent 의 event를 보내는 timming이 너무 느려서 터치 이벤트를 보내기에는 무리가 있었습니다.
교훈 : 미리 prototype을 만들어 테스트 해볼 것, 그리고 인터넷 자료를 너무 믿지 말자.
작업하다가 중단한 소스를 공개합니다.
모델에 따라서 track_devices_str 변수는 변경해 줘야 합니다. track할 device이름들입니다.
이벤트의 녹화 및 app change 되는 로그를 동시에 남기도록 하였습니다.
recevt.py
# # Record event # 2015.1.7 # import sys import Queue import adblibs import threading import datetime import time debug_mode = True track_devices_str = ["sec_touchscreen","sec_touchkey","hall","gpio_keys.9"] cmdline_getevent = "adb shell getevent" cmdline_geteventlog = "adb shell logcat -b events" ts = time.time() save_file_name = "eventrec_"+datetime.datetime.fromtimestamp(ts).strftime('%Y%m%d_%H%M%S')+".rec" print "*************************************************************" print "** Record event system since 2015 7 Jan, powered by Python **" print "*************************************************************" print "save file name:"+save_file_name print "Track devices:" print track_devices_str devices_str = adblibs.GetAdbDevices() if len(devices_str) == 0 : print "No device found" exit(-1) file = open(save_file_name, 'w') for _str in devices_str: print "device:"+_str adblibs.ClearAdbCache() _devices = adblibs.GetInputDevices(track_devices_str) msg_queue = Queue.Queue() th = threading.Thread(target=adblibs.GetEventThreadWithTrackInfo, args=(cmdline_getevent,_devices,msg_queue,debug_mode)) th.start() the = threading.Thread(target=adblibs.GetEventLogThread, args=(cmdline_geteventlog,msg_queue,debug_mode)) the.start() first_time = False print "Main Thread" while True: while not msg_queue.empty(): line = msg_queue.get() if len(line) > 0 : if not first_time : first_time = True first_time_stamp = datetime.datetime.now() time_diff = 0 time_diff_str = '0:00:00.000000' else: time_diff = datetime.datetime.now()-first_time_stamp first_time_stamp = datetime.datetime.now() time_diff_str = str(time_diff) if len(time_diff_str) < 8 : time_diff_str = time_diff_str+".000000" line_split = line.split(" ",1) print "%s %s %s"%(line_split[0], time_diff_str ,line_split[1]) file.write("%s %s %s\r"%(line_split[0], time_diff_str ,line_split[1])) if msg_queue.empty() : file.flush(); print "Out" th.join() the.join()
adblibs.py
''' adb library ''' import subprocess import threading import Queue import datetime import time import re import subprocess from ctypes import c_short import time, os, signal, threading class AsynchronousFileReader(threading.Thread): ''' Helper class to implement asynchronous reading of a file in a separate thread. Pushes read lines on a queue to be consumed in another thread. ''' def __init__(self, fd, queue): assert isinstance(queue, Queue.Queue) assert callable(fd.readline) threading.Thread.__init__(self) self._fd = fd self._queue = queue def run(self): '''The body of the tread: read lines and put them on the queue.''' for line in iter(self._fd.readline, ''): self._queue.put(line) def eof(self): '''Check whether there is no more content to expect.''' return not self.is_alive() and self._queue.empty() def adb_log_connect(cmdline): # You'll need to add any command line arguments here. process = subprocess.Popen(cmdline, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # Launch the asynchronous readers of the process' stdout. stdout_queue = Queue.Queue() stdout_reader = AsynchronousFileReader(process.stdout, stdout_queue) stdout_reader.start() return [stdout_queue,stdout_reader,process.stdin] def GetEventThread(cmdline_getevent,track_devices_str,msg_queue,debug_mode): ts = time.time() all_device_list = [] all_device_name_list = [] track_devices_name = [] for str in track_devices_str: track_devices_name.append("nodev") stdout_queue_event,stdout_reader_event, dummy = adb_log_connect(cmdline_getevent) device_detect_mode = True while not stdout_reader_event.eof(): while not stdout_queue_event.empty(): line = stdout_queue_event.get() y = line.split(); if len(y) < 0 : continue if device_detect_mode : if "add device" in line: x = line.split() device_name = x[3]; if "name:" in line: x = line.split() device_str = x[1].strip("\"") all_device_list.append(device_str) all_device_name_list.append(device_name) if device_str in track_devices_str : track_devices_name[track_devices_str.index(device_str)] = device_name if debug_mode : print device_name + " : " + device_str if "/dev/" in y[0]: if debug_mode : print "--------------------------" print "track list" print track_devices_str print track_devices_name print "--------------------------" print "full list" print all_device_list print all_device_name_list print "--------------------------" device_detect_mode = False y2 = y[0] y2 = y2.strip(":") y2 = y2.strip() if y2 in track_devices_name: line = line.replace(y[0],track_devices_str[track_devices_name.index(y2)]) x = line.split() line = "INPUT %s %d %d %d" %(x[0],int(x[1],16),int(x[2],16),c_short(int(x[3],16)).value) msg_queue.put(line) def GetEventThreadWithTrackInfo(cmdline_getevent,track_dev_str,msg_queue,debug_mode): stdout_queue_event,stdout_reader_event,dummy = adb_log_connect(cmdline_getevent) while not stdout_reader_event.eof(): while not stdout_queue_event.empty(): line = stdout_queue_event.get() y = line.split(); if len(y) < 0 : continue y2 = y[0] y2 = y2.strip(":") y2 = y2.strip() if y2 in track_dev_str[0]: line = line.replace(y[0],track_dev_str[1][track_dev_str[0].index(y2)]) x = line.split() line = "INPUT %s %d %d %d" %(x[0],int(x[1],16),int(x[2],16),c_short(int(x[3],16)).value) msg_queue.put(line) def GetEventLogThread(cmdline_getevent,msg_queue,debug_mode): stdout_queue_event,stdout_reader_event,dummy = adb_log_connect(cmdline_getevent) RE_COM = re.compile("I/am_on_resume_called\([\s]*\d+\)\:\s\[\d,([a-zA-Z0-9_.]+)\]") while not stdout_reader_event.eof(): while not stdout_queue_event.empty(): line = stdout_queue_event.get() if "am_on_resume_called" in line: conn = RE_COM.search(line) if conn : msg_queue.put("APPCH "+conn.group(1)) def ShellExecute(cmd) : fd = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) return fd.stdout, fd.stderr, fd.stdin def GetAdbDevices(): parse = False findit = False all_device_list = [] std_out, std_err, std_in = ShellExecute("adb devices") for line in std_out.readlines() : if (parse == True) and ("device" in line): findit = True y = line.split(); all_device_list.append(y[0]) if "List of devices attached" in line: parse = True if findit == True : return all_device_list return [] def ClearAdbCache(): std_out, std_err, std_in = ShellExecute("adb shell logcat -c -b events") return def GetInputDevices(track_list): # format # # add device 18: /dev/input/event15 # name: "Headset" # # _device_dev = [] _device_name = [] _n_device_dev = [] _n_device_name = [] std_out, std_err, std_in = ShellExecute("adb shell getevent -p") for line in std_out.readlines() : if ("add device" in line): y = line.split(); _device_dev.append(y[3]) if ("name:" in line): y = line.split(); _name = y[1].strip("\"") _device_name.append(_name) print "--full input list--" print _device_dev print _device_name if track_list : for _str in _device_name: if _str in track_list: _n_device_name.append(_str) _n_device_dev.append(_device_dev[_device_name.index(_str)]) print "--track input list--" print _n_device_dev print _n_device_name return [_n_device_dev,_n_device_name] else : return [_device_dev,_device_name] def AdbShell(_debug,_in_queue): stdout_queue_event,stdout_reader_event, std_in = adb_log_connect("adb shell") shutdown = True while not stdout_reader_event.eof(): while not _in_queue.empty(): _line = _in_queue.get() if( _line == "END" ) : print "ADB Shell END" os.kill(os.getpid(), signal.SIGINT) return std_in.write(_line) std_in.flush() print "ret AdbShell ~~~~~~~~~~~~~~" def GetStrToSec(re_compile,_str): _c = re_compile.search(_str) _time = (int(_c.group(1))*60*60) + (int(_c.group(2))*60) + (int(_c.group(3))) + (int(_c.group(4)) / 1000000.0) return _time
playevt.py
import adblibs import sys import Queue import re import threading import time import time, os, signal, threading def ExitForPrint(_string): print _string print "LINE : %d"%(l_pc) print "["+l_pcline+"]" exit(-1) re_compile = re.compile("(\d+)\:(\d+)\:(\d+)\.(\d+)") if len(sys.argv) == 1: print "please input file name" exit(1) l_file = open(sys.argv[1], 'r') if not l_file : print "open error file name" exit(1) l_file.close(); with open(sys.argv[1], "r") as l_file: l_content = l_file.read().splitlines() l_file.close(); print "LOC:%d"%(len(l_content)) devices_str = adblibs.GetAdbDevices() if len(devices_str) == 0 : print "No device found" exit(-1) l_devices = adblibs.GetInputDevices("") l_cmd_queue = Queue.Queue() th = threading.Thread(target=adblibs.AdbShell, args=(True,l_cmd_queue)) th.shutdown = True th.start() l_sub_stack = [] # 1 pass scan label l_line_pos = -1 l_label_list = [] l_label_list_idx = [] l_sub_list = [] l_sub_list_idx = [] for l_line in l_content: if( len(l_line) < 3 ) : continue if( l_line[0]=='#' ) : continue # comment l_line_pos = l_line_pos + 1 l_line_split = l_line.split() if ( l_line_split[0]=='LABEL' ) : l_label_list.append(l_line_split[1]) l_label_list_idx.append(l_line_pos) if ( l_line_split[0]=='SUB' ) : l_sub_list.append(l_line_split[1]) l_sub_list_idx.append(l_line_pos) # 2 pass l_pc = -1 while True : l_pc = l_pc + 1 if( l_pc >= len(l_content) ): print "*******LOF*******" l_cmd_queue.put("END") exit(1) break l_pcline = l_content[l_pc] if( len(l_pcline) < 3 ) : continue if( l_pcline[0]=='#' ) : continue # comment l_pc_split =l_pcline.split() if( l_pc_split[0]=='LABEL' ) : continue if( l_pc_split[0]=='END' ) : l_cmd_queue.put("END") print "*******END*******" break if( l_pc_split[0]=='GOTO' ) : if l_pc_split[1] in l_label_list : l_idx = l_label_list.index(l_pc_split[1]) l_pc = l_label_list_idx[l_idx] - 1 else : l_cmd_queue.put("END") ExitForPrint( "LABEL not found :" + l_pc_split[1] ) if( l_pc_split[0]=='GOSUB' ) : if l_pc_split[1] in l_sub_list : l_idx = l_sub_list.index(l_pc_split[1]) l_sub_stack.append(l_pc) if( len(l_sub_stack) > 100 ): l_cmd_queue.put("END") ExitForPrint( "stack overflow" ) l_pc = l_sub_list_idx[l_idx] - 1 else : ExitForPrint( "GOSUB not found :" + l_pc_split[1] ) if( l_pc_split[0]=='SUB' ) : continue if( l_pc_split[0]=='RET' ) : if( len(l_sub_stack) <= 0 ) : l_cmd_queue.put("END") ExitForPrint( "RET error underflow" ) l_pc = l_sub_stack.pop() if( l_pc_split[0]=='INPUT' ) : _time = adblibs.GetStrToSec(re_compile,l_pc_split[1]) if( _time > 0.1 ) : time.sleep( _time ) if l_pc_split[2] in l_devices[1] : _str = "sendevent "+l_devices[0][l_devices[1].index(l_pc_split[2])] + " " + l_pc_split[3] + " " + l_pc_split[4] + " " + l_pc_split[5] + "\r\n" else : print l_devices[0] print l_devices[1] l_cmd_queue.put("END") ExitForPrint( "device not found :" + l_pc_split[2] ) l_cmd_queue.put(_str) #print l_pcline
이벤트 녹화시 결과
INPUT 0:00:00.000000 sec_touchscreen 0 0 0 INPUT 0:00:00.010000 sec_touchscreen 3 57 -1 INPUT 0:00:00.000000 sec_touchscreen 1 330 0 INPUT 0:00:00.000000 sec_touchscreen 1 325 0 INPUT 0:00:00.000000 sec_touchscreen 0 0 0 INPUT 0:00:02.635000 sec_touchkey 1 158 1 INPUT 0:00:00.000000 sec_touchkey 0 0 0 INPUT 0:00:00.120000 sec_touchkey 1 158 0 INPUT 0:00:00.000000 sec_touchkey 0 0 0 APPCH 0:00:00.215000 com.android.contacts.activities.PeopleActivity INPUT 0:00:01.360000 sec_touchkey 1 158 1 INPUT 0:00:00.000000 sec_touchkey 0 0 0 INPUT 0:00:00.095000 sec_touchkey 1 158 0 INPUT 0:00:00.000000 sec_touchkey 0 0 0 APPCH 0:00:00.155000 com.android.launcher2.Launcher INPUT 0:00:02.635000 sec_touchkey 1 158 1 INPUT 0:00:00.000000 sec_touchkey 0 0 0 INPUT 0:00:00.070000 sec_touchkey 1 158 0 INPUT 0:00:00.000000 sec_touchkey 0 0 0
댓글 없음:
댓글 쓰기