2015년 2월 28일 토요일

android getevent 로 이벤트 녹화하기 및 재생하기 : python



안드로이드에서는 adb getevent를 이용하여 이벤트를 녹화 할 수 있다고 검색결과가 여기저기 나와있었습니다.
흥미로운 사실은 sendevent 를 이용하여 그대로 재생도 할 수 있는것 같아서 아 뭔가 만들수 있겠구나 해서 작업을 시작하였습니다.
http://cafe.naver.com/ruletheskycafe/418610

이벤트 녹화는 두개의 파일을 이용하여 제작 하였고 recevt.py adblibs.py 파일을 사용하며,
이벤트 재생은 playevt.py adblibs.py 파일을 사용하도록 만들었습니다.
스크립트 재생시 반복과 function 처리가 구현할 수 있도록 대충 만들어 90%정도의 완성율을 보여 테스트 해보았습니다.




하지만 결과는 대 실패였습니다


sendevent 의 event를 보내는 timming이 너무 느려서 터치 이벤트를 보내기에는 무리가 있었습니다.

교훈 : 미리 prototype을 만들어 테스트 해볼 것, 그리고 인터넷 자료를 너무 믿지 말자.

작업하다가 중단한 소스를 공개합니다.


모델에 따라서 track_devices_str 변수는 변경해 줘야 합니다. track할 device이름들입니다.
이벤트의 녹화 및 app change 되는 로그를 동시에 남기도록 하였습니다.

recevt.py


#
# Record event
# 2015.1.7
#

import sys
import Queue
import adblibs
import threading
import datetime
import time

debug_mode = True
track_devices_str = ["sec_touchscreen","sec_touchkey","hall","gpio_keys.9"]
cmdline_getevent = "adb shell getevent"

cmdline_geteventlog = "adb shell logcat -b events"


ts = time.time()
save_file_name = "eventrec_"+datetime.datetime.fromtimestamp(ts).strftime('%Y%m%d_%H%M%S')+".rec"

print "*************************************************************"
print "** Record event system since 2015 7 Jan, powered by Python **"
print "*************************************************************"
print "save file name:"+save_file_name
print "Track devices:" 
print track_devices_str


devices_str = adblibs.GetAdbDevices()
if len(devices_str) == 0 :
 print "No device found"
 exit(-1)

file = open(save_file_name, 'w') 

for _str in devices_str:
 print "device:"+_str

adblibs.ClearAdbCache()

_devices = adblibs.GetInputDevices(track_devices_str)

msg_queue = Queue.Queue()
th = threading.Thread(target=adblibs.GetEventThreadWithTrackInfo, args=(cmdline_getevent,_devices,msg_queue,debug_mode))
th.start()
the = threading.Thread(target=adblibs.GetEventLogThread, args=(cmdline_geteventlog,msg_queue,debug_mode))
the.start()


first_time = False

print "Main Thread"
while True:
 while not msg_queue.empty():
  line = msg_queue.get()
  if len(line) > 0 :
   if not first_time :
    first_time = True
    first_time_stamp = datetime.datetime.now()
    time_diff = 0
    time_diff_str = '0:00:00.000000'
   else:
    time_diff = datetime.datetime.now()-first_time_stamp
    first_time_stamp = datetime.datetime.now()
    time_diff_str = str(time_diff)
    if len(time_diff_str) < 8 :
     time_diff_str = time_diff_str+".000000"
   line_split = line.split(" ",1)
   print "%s %s %s"%(line_split[0], time_diff_str ,line_split[1])
   file.write("%s %s %s\r"%(line_split[0], time_diff_str ,line_split[1]))
   if msg_queue.empty() :
    file.flush();

print "Out"
th.join()
the.join()



adblibs.py



'''
adb library
'''

import subprocess
import threading
import Queue
import datetime
import time
import re
import subprocess
from ctypes import c_short
import time, os, signal, threading

class AsynchronousFileReader(threading.Thread):
 '''
 Helper class to implement asynchronous reading of a file
 in a separate thread. Pushes read lines on a queue to
 be consumed in another thread.
 '''

 def __init__(self, fd, queue):
  assert isinstance(queue, Queue.Queue)
  assert callable(fd.readline)
  threading.Thread.__init__(self)
  self._fd = fd
  self._queue = queue

 def run(self):
  '''The body of the tread: read lines and put them on the queue.'''
  for line in iter(self._fd.readline, ''):
   self._queue.put(line)

 def eof(self):
  '''Check whether there is no more content to expect.'''
  return not self.is_alive() and self._queue.empty()


def adb_log_connect(cmdline):

 # You'll need to add any command line arguments here.
 process = subprocess.Popen(cmdline, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

 # Launch the asynchronous readers of the process' stdout.
 stdout_queue = Queue.Queue()
 stdout_reader = AsynchronousFileReader(process.stdout, stdout_queue)
 stdout_reader.start()
 return [stdout_queue,stdout_reader,process.stdin]


def GetEventThread(cmdline_getevent,track_devices_str,msg_queue,debug_mode):
 ts = time.time()
 all_device_list = []
 all_device_name_list = []
 track_devices_name = []

 for str in track_devices_str:
  track_devices_name.append("nodev")

 stdout_queue_event,stdout_reader_event, dummy = adb_log_connect(cmdline_getevent)

 device_detect_mode = True
 while not stdout_reader_event.eof():
  while not stdout_queue_event.empty():
   line = stdout_queue_event.get()
   y = line.split();
   if len(y) < 0 : continue
   
   if device_detect_mode :
    if "add device" in line:
     x = line.split()
     device_name = x[3];
     
    if "name:" in line:
     x = line.split()
     device_str = x[1].strip("\"")
     
     all_device_list.append(device_str)
     all_device_name_list.append(device_name)
     
     if device_str in track_devices_str :
      track_devices_name[track_devices_str.index(device_str)] = device_name
      if debug_mode : print device_name + " : " + device_str

    if "/dev/" in y[0]:
     if debug_mode :
      print "--------------------------"
      print "track list"
      print track_devices_str
      print track_devices_name
      print "--------------------------"
      print "full list"
      print all_device_list
      print all_device_name_list
      print "--------------------------"
     device_detect_mode = False

   y2 = y[0]
   y2 = y2.strip(":")
   y2 = y2.strip()
   if y2 in track_devices_name:
    line = line.replace(y[0],track_devices_str[track_devices_name.index(y2)])
    x = line.split()
    
    line = "INPUT %s %d %d %d" %(x[0],int(x[1],16),int(x[2],16),c_short(int(x[3],16)).value)
    msg_queue.put(line) 
    
    
def GetEventThreadWithTrackInfo(cmdline_getevent,track_dev_str,msg_queue,debug_mode):
 stdout_queue_event,stdout_reader_event,dummy = adb_log_connect(cmdline_getevent)
 while not stdout_reader_event.eof():
  while not stdout_queue_event.empty():
   line = stdout_queue_event.get()
   y = line.split();
   if len(y) < 0 : continue
   
   y2 = y[0]
   y2 = y2.strip(":")
   y2 = y2.strip()
   if y2 in track_dev_str[0]:
    line = line.replace(y[0],track_dev_str[1][track_dev_str[0].index(y2)])
    x = line.split()
    
    line = "INPUT %s %d %d %d" %(x[0],int(x[1],16),int(x[2],16),c_short(int(x[3],16)).value)
    msg_queue.put(line) 
    

def GetEventLogThread(cmdline_getevent,msg_queue,debug_mode):
 stdout_queue_event,stdout_reader_event,dummy = adb_log_connect(cmdline_getevent)
 RE_COM = re.compile("I/am_on_resume_called\([\s]*\d+\)\:\s\[\d,([a-zA-Z0-9_.]+)\]")
 while not stdout_reader_event.eof():
  while not stdout_queue_event.empty():
   line = stdout_queue_event.get()
   if "am_on_resume_called" in line:
    conn = RE_COM.search(line)
    if conn :
     msg_queue.put("APPCH "+conn.group(1)) 
 
 

 
def ShellExecute(cmd) :
 fd = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 return fd.stdout, fd.stderr, fd.stdin


def GetAdbDevices():
 parse = False
 findit = False
 all_device_list = []
 std_out, std_err, std_in = ShellExecute("adb devices")
 for line in std_out.readlines() :
  if (parse == True) and ("device" in line):
   findit = True
   y = line.split();
   all_device_list.append(y[0])
  if "List of devices attached" in line:
   parse = True
   
 if findit == True :
  return all_device_list
 return []

def ClearAdbCache():
 std_out, std_err, std_in = ShellExecute("adb shell logcat -c -b events")
 return

def GetInputDevices(track_list):
 # format 
 #
 # add device 18: /dev/input/event15
 #   name:     "Headset"
 #
 #
 _device_dev = []
 _device_name = []
 _n_device_dev = []
 _n_device_name = []
 std_out, std_err, std_in = ShellExecute("adb shell getevent -p")
 for line in std_out.readlines() :
  if ("add device" in line):
   y = line.split();
   _device_dev.append(y[3])
  if ("name:" in line):
   y = line.split();
   _name = y[1].strip("\"")
   _device_name.append(_name)
 print "--full input list--"
 print _device_dev
 print _device_name
 
 if track_list :
  for _str in _device_name:
   if _str in track_list:
    _n_device_name.append(_str)
    _n_device_dev.append(_device_dev[_device_name.index(_str)])
 
  print "--track input list--"
  print _n_device_dev
  print _n_device_name
  return [_n_device_dev,_n_device_name]
 else :
  return [_device_dev,_device_name]

def AdbShell(_debug,_in_queue):
 stdout_queue_event,stdout_reader_event, std_in = adb_log_connect("adb shell")
 shutdown = True
 while not stdout_reader_event.eof():
  while not _in_queue.empty():
   _line = _in_queue.get()
   if( _line == "END" ) :
    print "ADB Shell END"
    os.kill(os.getpid(), signal.SIGINT)
    return
   std_in.write(_line)
   std_in.flush()
 print "ret AdbShell ~~~~~~~~~~~~~~"

 
def GetStrToSec(re_compile,_str):
 _c = re_compile.search(_str)
 _time = (int(_c.group(1))*60*60) + (int(_c.group(2))*60) + (int(_c.group(3))) + (int(_c.group(4)) / 1000000.0)
 return _time


playevt.py


import adblibs
import sys
import Queue
import re
import threading
import time
import time, os, signal, threading

def ExitForPrint(_string):
 print _string
 print "LINE : %d"%(l_pc)
 print "["+l_pcline+"]"
 exit(-1)

re_compile = re.compile("(\d+)\:(\d+)\:(\d+)\.(\d+)")

if len(sys.argv) == 1:
 print "please input file name"
 exit(1)

l_file = open(sys.argv[1], 'r') 
if not l_file :
 print "open error file name"
 exit(1)
l_file.close();

with open(sys.argv[1], "r") as l_file:
 l_content = l_file.read().splitlines()
l_file.close();

print "LOC:%d"%(len(l_content))

devices_str = adblibs.GetAdbDevices()
if len(devices_str) == 0 :
 print "No device found"
 exit(-1)

l_devices = adblibs.GetInputDevices("")

l_cmd_queue = Queue.Queue()
th = threading.Thread(target=adblibs.AdbShell, args=(True,l_cmd_queue))
th.shutdown = True
th.start()

l_sub_stack = []

# 1 pass scan label
l_line_pos = -1
l_label_list = []
l_label_list_idx = []
l_sub_list = []
l_sub_list_idx = []
for l_line in l_content:
 if( len(l_line) < 3 ) : continue
 if( l_line[0]=='#' ) : continue # comment
 l_line_pos = l_line_pos + 1
 l_line_split = l_line.split()
 if ( l_line_split[0]=='LABEL' ) :
  l_label_list.append(l_line_split[1])
  l_label_list_idx.append(l_line_pos)
 if ( l_line_split[0]=='SUB' ) :
  l_sub_list.append(l_line_split[1])
  l_sub_list_idx.append(l_line_pos)

# 2 pass
l_pc = -1
while True :
 l_pc = l_pc + 1
 if( l_pc >= len(l_content) ):
  print "*******LOF*******"
  l_cmd_queue.put("END")
  exit(1)
  break
 
 l_pcline = l_content[l_pc]
 
 if( len(l_pcline) < 3 ) : continue
 
 if( l_pcline[0]=='#' ) : continue # comment
 
 l_pc_split =l_pcline.split()

 if( l_pc_split[0]=='LABEL' ) : continue

 if( l_pc_split[0]=='END' ) : 
  l_cmd_queue.put("END")
  print "*******END*******"
  break

 if( l_pc_split[0]=='GOTO' ) : 
  if l_pc_split[1] in l_label_list :
   l_idx = l_label_list.index(l_pc_split[1])
   l_pc = l_label_list_idx[l_idx] - 1
  else :
   l_cmd_queue.put("END")
   ExitForPrint( "LABEL not found :" + l_pc_split[1] )
   
 if( l_pc_split[0]=='GOSUB' ) : 
  if l_pc_split[1] in l_sub_list :
   l_idx = l_sub_list.index(l_pc_split[1])
   l_sub_stack.append(l_pc)
   if( len(l_sub_stack) > 100 ):
    l_cmd_queue.put("END")
    ExitForPrint( "stack overflow" )
   l_pc = l_sub_list_idx[l_idx] - 1
  else :
   ExitForPrint( "GOSUB not found :" + l_pc_split[1] )

 if( l_pc_split[0]=='SUB' ) : continue
 
 if( l_pc_split[0]=='RET' ) : 
  if( len(l_sub_stack) <= 0 ) :
   l_cmd_queue.put("END")
   ExitForPrint( "RET error underflow" )
  l_pc = l_sub_stack.pop()

 if( l_pc_split[0]=='INPUT' ) :
  _time = adblibs.GetStrToSec(re_compile,l_pc_split[1])
  if( _time > 0.1 ) :
   time.sleep( _time )
  if l_pc_split[2] in l_devices[1] :
   _str = "sendevent "+l_devices[0][l_devices[1].index(l_pc_split[2])] + " " + l_pc_split[3] + " " + l_pc_split[4] + " " + l_pc_split[5] + "\r\n"
  else :
   print l_devices[0]
   print l_devices[1]
   l_cmd_queue.put("END")
   ExitForPrint( "device not found :" + l_pc_split[2] )
  l_cmd_queue.put(_str)
  
 #print l_pcline
 

이벤트 녹화시 결과

INPUT 0:00:00.000000 sec_touchscreen 0 0 0
INPUT 0:00:00.010000 sec_touchscreen 3 57 -1
INPUT 0:00:00.000000 sec_touchscreen 1 330 0
INPUT 0:00:00.000000 sec_touchscreen 1 325 0
INPUT 0:00:00.000000 sec_touchscreen 0 0 0
INPUT 0:00:02.635000 sec_touchkey 1 158 1
INPUT 0:00:00.000000 sec_touchkey 0 0 0
INPUT 0:00:00.120000 sec_touchkey 1 158 0
INPUT 0:00:00.000000 sec_touchkey 0 0 0
APPCH 0:00:00.215000 com.android.contacts.activities.PeopleActivity
INPUT 0:00:01.360000 sec_touchkey 1 158 1
INPUT 0:00:00.000000 sec_touchkey 0 0 0
INPUT 0:00:00.095000 sec_touchkey 1 158 0
INPUT 0:00:00.000000 sec_touchkey 0 0 0
APPCH 0:00:00.155000 com.android.launcher2.Launcher
INPUT 0:00:02.635000 sec_touchkey 1 158 1
INPUT 0:00:00.000000 sec_touchkey 0 0 0
INPUT 0:00:00.070000 sec_touchkey 1 158 0
INPUT 0:00:00.000000 sec_touchkey 0 0 0

댓글 없음:

댓글 쓰기