# -*-python-*- __package__ = "s60_gps_locator.py" __version__ = "1.0" __author__ = "Aaron Straup Cope" __url__ = "http://www.aaronland.info/python/gps_locator/" __cvsversion__ = "$Revision: 1.5 $" __date__ = "$Date: 2007/04/05 15:23:55 $" __copyright__ = "Copyright (c) 2006-2007 Aaron Straup Cope. Perl Artistic License." # # extlib : s60-simplejson.py # from __future__ import generators import sre_parse, sre_compile, sre_constants from sre_constants import BRANCH, SUBPATTERN from sre import VERBOSE, MULTILINE, DOTALL import re FLAGS = re.VERBOSE | re.MULTILINE | re.DOTALL WHITESPACE = re.compile(r'\s*', FLAGS) def pattern(pattern, flags=FLAGS): def decorator(fn): fn.pattern = pattern fn.regex = re.compile(pattern, flags) return fn return decorator def enumerate (list) : lot = [] for i in list : lot.append((len(lot), i)) return lot class Scanner(object): def __init__(self, lexicon, flags=FLAGS): self.actions = [None] s = sre_parse.Pattern() s.flags = flags p = [] for idx, token in enumerate(lexicon): phrase = token.pattern try: subpattern = sre_parse.SubPattern(s, [(SUBPATTERN, (idx + 1, sre_parse.parse(phrase, flags)))]) except sre_constants.error: raise p.append(subpattern) self.actions.append(token) p = sre_parse.SubPattern(s, [(BRANCH, (None, p))]) self.scanner = sre_compile.compile(p) def iterscan(self, string, idx=0, context=None): match = self.scanner.scanner(string, idx).match actions = self.actions lastend = idx end = len(string) while True: m = match() if m is None: break matchbegin, matchend = m.span() if lastend == matchend: break action = actions[m.lastindex] if action is not None: rval, next_pos = action(m, context) if next_pos is not None and next_pos != matchend: matchend = next_pos match = self.scanner.scanner(string, matchend).match yield rval, matchend lastend = matchend def _floatconstants(): import struct import sys _BYTES = '7FF80000000000007FF0000000000000'.decode('hex') nan, inf = struct.unpack('dd', _BYTES) return nan, inf, -inf NaN, PosInf, NegInf = _floatconstants() def linecol(doc, pos): lineno = doc.count('\n', 0, pos) + 1 if lineno == 1: colno = pos else: colno = pos - doc.rindex('\n', 0, pos) return lineno, colno def errmsg(msg, doc, pos, end=None): lineno, colno = linecol(doc, pos) if end is None: return '%s: line %d column %d (char %d)' % (msg, lineno, colno, pos) endlineno, endcolno = linecol(doc, end) return '%s: line %d column %d - line %d column %d (char %d - %d)' % ( msg, lineno, colno, endlineno, endcolno, pos, end) _CONSTANTS = { '-Infinity': NegInf, 'Infinity': PosInf, 'NaN': NaN, 'true': True, 'false': False, 'null': None, } def JSONConstant(match, context, c=_CONSTANTS): return c[match.group(0)], None pattern('(-?Infinity|NaN|true|false|null)')(JSONConstant) def JSONNumber(match, context): match = JSONNumber.regex.match(match.string, *match.span()) integer, frac, exp = match.groups() if frac or exp: res = float(integer + (frac or '') + (exp or '')) else: res = int(integer) return res, None pattern(r'(-?(?:0|[1-9]\d*))(\.\d+)?([eE][-+]?\d+)?')(JSONNumber) STRINGCHUNK = re.compile(r'(.*?)(["\\])', FLAGS) BACKSLASH = { '"': u'"', '\\': u'\\', '/': u'/', 'b': u'\b', 'f': u'\f', 'n': u'\n', 'r': u'\r', 't': u'\t', } DEFAULT_ENCODING = "utf-8" def scanstring(s, end, encoding=None, _b=BACKSLASH, _m=STRINGCHUNK.match): if encoding is None: encoding = DEFAULT_ENCODING chunks = [] _append = chunks.append begin = end - 1 while 1: chunk = _m(s, end) if chunk is None: raise ValueError( errmsg("Unterminated string starting at", s, begin)) end = chunk.end() content, terminator = chunk.groups() if content: if not isinstance(content, unicode): content = unicode(content, encoding) _append(content) if terminator == '"': break try: esc = s[end] except IndexError: raise ValueError( errmsg("Unterminated string starting at", s, begin)) if esc != 'u': try: m = _b[esc] except KeyError: raise ValueError( errmsg("Invalid \\escape: %r" % (esc,), s, end)) end += 1 else: esc = s[end + 1:end + 5] try: m = unichr(int(esc, 16)) if len(esc) != 4 or not esc.isalnum(): raise ValueError except ValueError: raise ValueError(errmsg("Invalid \\uXXXX escape", s, end)) end += 5 _append(m) return u''.join(chunks), end def JSONString(match, context): encoding = getattr(context, 'encoding', None) return scanstring(match.string, match.end(), encoding) pattern(r'"')(JSONString) def JSONObject(match, context, _w=WHITESPACE.match): pairs = {} s = match.string end = _w(s, match.end()).end() nextchar = s[end:end + 1] if nextchar == '}': return pairs, end + 1 if nextchar != '"': raise ValueError(errmsg("Expecting property name", s, end)) end += 1 encoding = getattr(context, 'encoding', None) iterscan = JSONScanner.iterscan while True: key, end = scanstring(s, end, encoding) end = _w(s, end).end() if s[end:end + 1] != ':': raise ValueError(errmsg("Expecting : delimiter", s, end)) end = _w(s, end + 1).end() try: value, end = iterscan(s, idx=end, context=context).next() except StopIteration: raise ValueError(errmsg("Expecting object", s, end)) pairs[key] = value end = _w(s, end).end() nextchar = s[end:end + 1] end += 1 if nextchar == '}': break if nextchar != ',': raise ValueError(errmsg("Expecting , delimiter", s, end - 1)) end = _w(s, end).end() nextchar = s[end:end + 1] end += 1 if nextchar != '"': raise ValueError(errmsg("Expecting property name", s, end - 1)) object_hook = getattr(context, 'object_hook', None) if object_hook is not None: pairs = object_hook(pairs) return pairs, end pattern(r'{')(JSONObject) def JSONArray(match, context, _w=WHITESPACE.match): values = [] s = match.string end = _w(s, match.end()).end() nextchar = s[end:end + 1] if nextchar == ']': return values, end + 1 iterscan = JSONScanner.iterscan while True: try: value, end = iterscan(s, idx=end, context=context).next() except StopIteration: raise ValueError(errmsg("Expecting object", s, end)) values.append(value) end = _w(s, end).end() nextchar = s[end:end + 1] end += 1 if nextchar == ']': break if nextchar != ',': raise ValueError(errmsg("Expecting , delimiter", s, end)) end = _w(s, end).end() return values, end pattern(r'\[')(JSONArray) ANYTHING = [ JSONObject, JSONArray, JSONString, JSONConstant, JSONNumber, ] JSONScanner = Scanner(ANYTHING) class JSONDecoder(object): _scanner = Scanner(ANYTHING) __all__ = ['__init__', 'decode', 'raw_decode'] def __init__(self, encoding=None, object_hook=None): self.encoding = encoding self.object_hook = object_hook def decode(self, s, _w=WHITESPACE.match): obj, end = self.raw_decode(s, idx=_w(s, 0).end()) end = _w(s, end).end() if end != len(s): raise ValueError(errmsg("Extra data", s, end, len(s))) return obj def raw_decode(self, s, **kw): kw.setdefault('context', self) try: obj, end = self._scanner.iterscan(s, **kw).next() except StopIteration: raise ValueError("No JSON object could be decoded") return obj, end class simplejson : def __init__ (self) : pass def dump(self, obj, fp, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, cls=None, indent=None, **kw): if cls is None: cls = JSONEncoder iterable = cls(skipkeys=skipkeys, ensure_ascii=ensure_ascii, check_circular=check_circular, allow_nan=allow_nan, indent=indent, **kw).iterencode(obj) for chunk in iterable: fp.write(chunk) def dumps(self, obj, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, cls=None, indent=None, **kw): if cls is None: cls = JSONEncoder return cls(skipkeys=skipkeys, ensure_ascii=ensure_ascii, check_circular=check_circular, allow_nan=allow_nan, indent=indent, **kw).encode(obj) def load(self, fp, encoding=None, cls=None, object_hook=None, **kw): if cls is None: cls = JSONDecoder if object_hook is not None: kw['object_hook'] = object_hook return cls(encoding=encoding, **kw).decode(fp.read()) def loads(self, s, encoding=None, cls=None, object_hook=None, **kw): if cls is None: cls = JSONDecoder if object_hook is not None: kw['object_hook'] = object_hook return cls(encoding=encoding, **kw).decode(s) # # extlib : s60-simpleapp.py # import e32 import appuifw class simpleapp : def __init__ (self) : self.__log__ = u"" self.__kill__ = False self.__poll__ = False self.__current__ = u"" self.__activity__ = [] if self.use_lock() : self.__lock__ = e32.Ao_lock() appuifw.app.exit_key_handler = self.abort def abort (self) : self.__kill__ = True self.__lock__.signal() def loop (self) : if self.setup() : self.run() if self.use_lock() : self.__lock__.wait() def setup (self) : self.log("I am running") def run (self) : self.poll() def poll (self) : if self.__poll__ : self.error("Already polling") return False self.__poll__ = True self.do_poll() self.__poll__ = False self.sleep() def do_poll (self) : pass def sleep (self, secs=600) : self.record_activity("Sleeping for %s seconds" % secs) if e32.in_emulator() : return time.sleep(secs) self.poll() else : timer = e32.Ao_timer() timer.after(secs, self.poll) self.__timer__ = timer def use_lock (self) : if e32.in_emulator() : return True return False def log (self, msg, reset=False) : if reset : self.__log__ = unicode("") self.__log__ += "%s\n" % msg self.write(unicode(self.__log__)) def write(self, txt) : if e32.in_emulator() : t = appuifw.Text() t.write(txt) appuifw.app.body = t else : appuifw.app.body = appuifw.Text(txt) def announce (self, msg, type='info') : self.note(msg, type, True) def notice (self, msg) : self.note(msg, 'info') def error (self, msg) : self.note(msg, 'error') def note (self, msg, type, gbl=False) : if not e32.in_emulator and gbl : appuifw.note(unicode(msg), type, gbl) else : appuifw.note(unicode(msg), type) def prompt (self, query, type="text") : if e32.in_emulator() : return raw_input("%s : " % query) return appuifw.query(unicode(query), type) def record_activity (self, msg, display=True) : self.__activity__.append(unicode("* %s" % msg)) if display : self.display_activity() def display_activity (self) : msg = "\n".join(self.__activity__) reset = True self.log(msg, reset) def switch_to_app (self) : self.display_app() self.setup_app_menu() def display_app (self) : reset = True self.log(self.__current__, reset) def switch_to_activity (self) : self.display_activity() self.setup_activity_menu() def setup_activity_menu (self) : pass def setup_app_menu (self) : pass def setup_simplestore(self, storename) : storepath = storename if not e32.in_emulator() : import sysinfo if 'E:' in sysinfo.free_drivespace().keys() : storepath = 'e:\\%s' % storename else : storepath = 'c:\\%s' % storename try : self.__store__ = simplestore(storepath) except Exception, e : self.error("Failed to open local datastore, %s" % e) return False return True def setup_access_point (self) : if e32.in_emulator() : return True import socket try : apid = socket.select_access_point() apnt = socket.access_point(apid) socket.set_default_access_point(apnt) except Exception, e : self.error("Failed to setup access point : %s" % e) return False return True def vibrate (self) : try : import misty misty.vibrate(500, 100) except Exception, e : return False # # extlib : s60-simplestore.py # import e32 class simplestore : def __init__ (self, path) : self.__path__ = path self.__db__ = None self.open() def open (self) : if e32.in_emulator() : import anydbm self.__db__ = anydbm.open(self.__path__, 'c') else : import e32dbm self.__db__ = e32dbm.open(self.__path__, 'c') def close (self) : self.__db__.close() self.__db__ = None def read (self, key) : key = str(key) if self.__db__.has_key(key) : return self.__db__[key] return None def write (self, key, value) : key = str(key) value = str(value) self.__db__[key] = value self.save() def save (self) : self.close() self.open() # # extlib : s60-simplegps.py # import socket def simplegps_bt_enabled () : if not e32.in_emulator() : return True try : import lightblue return True except Exception, e : return False def simplegps_selectservice() : if e32.in_emulator() : if not simplegps_bt_enabled() : return None import lightblue s = lightblue.selectservice() addr = s[0] service = s[1] else : addr, services = socket.bt_discover() service = services.values()[0] return (addr, int(service)) class simplegps : def __init__ (self, addr, service) : self.__addr = addr self.__service = service self.__target = (self.__addr, self.__service) self.__sock = None def connect (self) : try : if e32.in_emulator() : import lightblue sock = lightblue.socket() else : sock = socket.socket(socket.AF_BT, socket.SOCK_STREAM) sock.connect(self.__target) self.__sock = sock except Exception, e : raise Exception("Connect error : %s" % e) def disconnect (self) : if not self.__sock : return self.__sock.close() self.__sock = None def plot (self) : try : ln = self.read() except Exception, e : raise Exception("Read error : %s" % e) try : return self.parse(ln) except Exception, e : raise Exception("Parse error : %s" % e) def read (self, type="$GPRMC") : ln = "" while 1: char = self.__sock.recv(1) if not char: break ln += char if char == "\n": if ln.startswith(type) : break else : ln = '' return ln def parse (self, gps_data) : lat = '' lon = '' if (gps_data.count(",") == 11) : (GPRMC,hhmmss,status,lat,l1,lon,l2,speed_knots,course,ddmmyy,mvd,mdr_cs)=gps_data.split(",") elif (gps_data.count(",") == 12) : (GPRMC,hhmmss,status,lat,l1,lon,l2,speed_knots,course,ddmmyy,mvd,mdr,fix_cs)=gps_data.split(",") else: pass if (status is 'A' and lat is not "" and lon is not ""): lat, lon = self.parse_latlong(lat, l1, lon, l2) dt = self.parse_dt(ddmmyy, hhmmss) return (dt, lat, lon) def parse_dt (self, ddmmyy, hhmmss) : d = (ddmmyy[4:6], ddmmyy[2:4], ddmmyy[0:2]) t = (hhmmss[0:2], hhmmss[2:4], hhmmss[4:6]) d = "-".join(d) t = ":".join(t) return "20%sT%s" % (d, t) def parse_latlong (self, lat, l1, lon, l2): try: latitude = float(lat[0:2]) + (float(lat[2:4] + "." + lat[5:9])/60) longitude = float(lon[0:3]) + (float(lon[3:5] + "." + lon[6:10])/60) if l1 is "S": latitude = -latitude if l2 is "W": longitude = -longitude latitude = float("%.6f" % latitude) longitude = float("%.6f" % longitude) return latitude, longitude except: return (None, None) # # extlib : s60-simplegpsrecorder.py # class simplegpsrecorder (simpleapp) : def __init__ (self) : simpleapp.__init__(self) self.__cfg = None self.__gps = None self.__log = None self.__poll = True self.__delay = 60 self.__bt_enabled = simplegps_bt_enabled() self.__pos = () def setup (self) : if not self.setup_cfg() : self.record_activity("Setup failed : config") return False if not self.setup_gps() : self.record_activity("Setup failed : GPS") return False if not self.setup_logger() : self.record_activity("Setup failed : logging") return False self.setup_menu() self.record_activity("Setup complete") return True def setup_cfg (self) : storepath = 'test' if not e32.in_emulator() : import sysinfo if 'E:' in sysinfo.free_drivespace().keys() : storepath = 'e:\\gpsr.cfg' else : storepath = 'c:\\gpsr.cfg' try : self.__cfg = simplestore(storepath) except Exception, e : self.error("Failed to load configs : %s" % e) return False return True def setup_gps (self, force=False) : if not self.bt_enabled() : return True try : (addr, service) = self.get_device_config(force) self.__gps = simplegps(addr, service) except Exception, e : self.error("Failed to load device glue : %s" % e) return False return True def setup_menu (self) : items = self.menu_items() appuifw.app.menu = items def menu_items (self) : items = [] if self.__poll : items.append((unicode("Pause"), self.pause)) else : items.append((unicode("Resume"), self.resume)) items.append((unicode("Set delay"), self.set_delay)) items.append((unicode("Configure GPS device"), self.configure_gps_device)) return items def configure_gps_device (self) : already_paused = self.__poll if not already_paused : self.pause() force = True res = self.setup_gps(force) if not already_paused : poll = False self.resume(poll) if res : self.record_activity("GPS device settings saved") return res def get_device_config (self, force=False) : addr = self.__cfg.read('bt_addr') service = self.__cfg.read('bt_service') if force or not addr or not service : try : addr, service = simplegps_selectservice() except Exception, e : raise Exception("Discovery error : %s" % e) try : self.__cfg.write('bt_addr', addr) self.__cfg.write('bt_service', service) except Exception, e : self.error("Failed to record device information : %s" % e) return (addr, int(service)) def define_logroot (self) : storepath = './gpsr' if not e32.in_emulator() : import sysinfo if 'E:' in sysinfo.free_drivespace().keys() : storepath = 'e:\\gpsr' else : storepath = 'c:\\gpsr' return storepath def setup_logger (self) : self.__log = simplelogger() storepath = self.define_logroot() self.__log.set_logroot(storepath) return True def do_poll (self) : if not self.__poll : return self.record_activity("Polling GPS device") if self.bt_enabled() : try : self.__gps.connect() except Exception, e : self.error("SNFU : %s" % e) return False try : (dt, lat, lon) = self.plot() self.process_data(dt, lat, lon) except Exception, e : self.error("Failed to generate location information : %s" % e) if self.bt_enabled() : self.__gps.disconnect() def plot (self) : try : (dt, lat, lon) = self.__gps.plot() return (dt, lat, lon) except Exception, e : raise Exception("Plot error : %s" % e) def process_data (self, dt, lat, lon) : self.record_activity("process GPS data here") def sleep (self) : delay = self.get_delay() simpleapp.sleep(self, delay) def resume (self, poll=True) : self.__poll = True self.setup_menu() if poll : self.poll() self.record_activity("GPS polling restarted") def pause (self) : self.__poll = False self.setup_menu() self.record_activity("GPS polling paused") def set_delay (self) : delay = self.prompt(u"Seconds between polling", "text") self.__cfg.write('delay', delay) n = self.__cfg.read('delay') self.record_activity("Set delay to %s seconds" % n) def get_delay (self) : n = self.__cfg.read('delay') if not n or not int(n) : n = self.__delay return int(n) def transfer_logs (self) : addr = None service = None try : addr, services = socket.bt_obex_discover() service = services.values()[0] except Exception, e : self.error("Discovery error : %s" % e) return False import os import os.path logroot = self.define_logroot() filelist = os.listdir(logroot) for fname in filelist : path = os.path.join(logroot, fname) self.record_activity("Send %s" % path) self.send_log(addr, service, unicode(path)) self.record_activity("Transfer complete") def send_log (self, addr, service, path) : try : socket.bt_obex_send_file(addr, service, path) except Exception, e : self.error("Transfer error : %s" % e) return False return True def bt_enabled (self) : return self.__bt_enabled # # extlib : s60-simplegeonames.py # class simplegeonames : def find_nearby_place_name (self, lat, lon) : path = "/findNearbyPlaceNameJSON?lat=%s&lng=%s" % (lat, lon) try : http = simplehttp() res = http.get('ws.geonames.org', path) except Exception, e : raise Exception("Query error : %s" % e) try : json = simplejson() data = json.loads(res) except Exception, e : raise Exception("Parse error : %s" % e) return data['geonames'] # # extlib : s60-simplelogger.py # import os import os.path class simplelogger : def __init (self) : self.__queue = [] self.__logroot = '' self.__logfile = '' def set_logroot (self, root) : self.__logroot = root def set_logfile (self, fname) : self.__logfile = fname def log (self, msg) : try : self.log_to_disk(msg) except Exception, e: raise Exception(e) return True def log_to_disk (self, msg) : if not os.path.isdir(self.__logroot) : os.mkdir(self.__logroot) path = "%s\%s" % (self.__logroot, self.__logfile) try : fh = open(path, "a") fh.write(msg) fh.close() except Exception, e : raise Exception("Failed to open %s : %s" % (path, e)) return True # # extlib : s60-simplehttp.py # import httplib import base64 class simplehttp : def __init__ (self, creds=None) : self.__auth__ = None if not creds == None : user = creds[0] pswd = creds[1] self.__auth__ = base64.encodestring(user + ":" + pswd) def get (self, host, path, headers={}) : return self.execute_http_request('GET', host, path, headers) def post (self, host, path, params=None, headers={}) : return self.execute_http_request('POST', host, path, params) def execute_http_request(self, meth, host, path, params=None, headers={}) : if self.__auth__ : headers['Authorization'] = "Basic %s" % self.__auth__ c = httplib.HTTPConnection(host) c.request(meth, path, params, headers) r = c.getresponse() return r.read() class gps_locator (simplegpsrecorder) : def __init__ (self) : simplegpsrecorder.__init__(self) self.__currently = [] # # # def setup (self) : if not self.setup_access_point() : return False return simplegpsrecorder.setup(self) # # # def menu_items (self) : items = simplegpsrecorder.menu_items(self) items.append((u"Tell Twitter", self.tell_twitter)) return items # # # def process_data (self, dt, lat, lon) : self.__currently = [] if not lat or not lon : self.record_activity("Missing lat/lon; skipping") return False geo = simplegeonames() self.record_activity("Where is %s, %s" % (lat, lon)) try : res = geo.find_nearby_place_name(lat, lon) except Exception, e : raise Exception("Processing error : %s" % e) for pl in res : name = self.format_place_name(pl) self.__currently.append(name) loc = "geonames.org thinks it is : %s" % name self.record_activity(loc) return True # # # def tell_twitter (self) : if len(self.__currently) == 0 : self.error("I don't know where I am!") return False msg = "is in %s" % " or maybe ".join(self.__currently) if not e32.in_emulator() : try : import messaging messaging.sms_send(40404, unicode(msg)) except Exception, e : self.error("Messaging error : %s" % e) return False self.record_activity("Twitter message sent : %s" % msg) return True # # # def format_place_name (self, pl) : name = "%s, %s, %s" % (pl['name'], pl['adminName1'], pl['countryName']) return name # # # def plot (self) : return simplegpsrecorder.plot(self) # # # if __name__ == "__main__" : gps = gps_locator() gps.loop()