from __future__ import generators # -*-python-*- __package__ = "s60_twitteradio.py" __version__ = "1.0" __author__ = "Aaron Straup Cope" __url__ = "http://www.aaronland.info/python/twitteradio/" __cvsversion__ = "$Revision: 1.5 $" __date__ = "$Date: 2007/07/30 01:31:26 $" __copyright__ = "Copyright (c) 2006-2007 Aaron Straup Cope. Perl Artistic License." # # extlib : s60-simplejson.py # import sre_parse, sre_compile, sre_constants from sre_constants import BRANCH, SUBPATTERN from sre import VERBOSE, MULTILINE, DOTALL import re FLAGS = re.VERBOSE | re.MULTILINE | re.DOTALL WHITESPACE = re.compile(r'\s*', FLAGS) def pattern(pattern, flags=FLAGS): def decorator(fn): fn.pattern = pattern fn.regex = re.compile(pattern, flags) return fn return decorator def enumerate (list) : lot = [] for i in list : lot.append((len(lot), i)) return lot class Scanner(object): def __init__(self, lexicon, flags=FLAGS): self.actions = [None] s = sre_parse.Pattern() s.flags = flags p = [] for idx, token in enumerate(lexicon): phrase = token.pattern try: subpattern = sre_parse.SubPattern(s, [(SUBPATTERN, (idx + 1, sre_parse.parse(phrase, flags)))]) except sre_constants.error: raise p.append(subpattern) self.actions.append(token) p = sre_parse.SubPattern(s, [(BRANCH, (None, p))]) self.scanner = sre_compile.compile(p) def iterscan(self, string, idx=0, context=None): match = self.scanner.scanner(string, idx).match actions = self.actions lastend = idx end = len(string) while True: m = match() if m is None: break matchbegin, matchend = m.span() if lastend == matchend: break action = actions[m.lastindex] if action is not None: rval, next_pos = action(m, context) if next_pos is not None and next_pos != matchend: matchend = next_pos match = self.scanner.scanner(string, matchend).match yield rval, matchend lastend = matchend def _floatconstants(): import struct import sys _BYTES = '7FF80000000000007FF0000000000000'.decode('hex') nan, inf = struct.unpack('dd', _BYTES) return nan, inf, -inf NaN, PosInf, NegInf = _floatconstants() def linecol(doc, pos): lineno = doc.count('\n', 0, pos) + 1 if lineno == 1: colno = pos else: colno = pos - doc.rindex('\n', 0, pos) return lineno, colno def errmsg(msg, doc, pos, end=None): lineno, colno = linecol(doc, pos) if end is None: return '%s: line %d column %d (char %d)' % (msg, lineno, colno, pos) endlineno, endcolno = linecol(doc, end) return '%s: line %d column %d - line %d column %d (char %d - %d)' % ( msg, lineno, colno, endlineno, endcolno, pos, end) _CONSTANTS = { '-Infinity': NegInf, 'Infinity': PosInf, 'NaN': NaN, 'true': True, 'false': False, 'null': None, } def JSONConstant(match, context, c=_CONSTANTS): return c[match.group(0)], None pattern('(-?Infinity|NaN|true|false|null)')(JSONConstant) def JSONNumber(match, context): match = JSONNumber.regex.match(match.string, *match.span()) integer, frac, exp = match.groups() if frac or exp: res = float(integer + (frac or '') + (exp or '')) else: res = int(integer) return res, None pattern(r'(-?(?:0|[1-9]\d*))(\.\d+)?([eE][-+]?\d+)?')(JSONNumber) STRINGCHUNK = re.compile(r'(.*?)(["\\])', FLAGS) BACKSLASH = { '"': u'"', '\\': u'\\', '/': u'/', 'b': u'\b', 'f': u'\f', 'n': u'\n', 'r': u'\r', 't': u'\t', } DEFAULT_ENCODING = "utf-8" def scanstring(s, end, encoding=None, _b=BACKSLASH, _m=STRINGCHUNK.match): if encoding is None: encoding = DEFAULT_ENCODING chunks = [] _append = chunks.append begin = end - 1 while 1: chunk = _m(s, end) if chunk is None: raise ValueError( errmsg("Unterminated string starting at", s, begin)) end = chunk.end() content, terminator = chunk.groups() if content: if not isinstance(content, unicode): content = unicode(content, encoding) _append(content) if terminator == '"': break try: esc = s[end] except IndexError: raise ValueError( errmsg("Unterminated string starting at", s, begin)) if esc != 'u': try: m = _b[esc] except KeyError: raise ValueError( errmsg("Invalid \\escape: %r" % (esc,), s, end)) end += 1 else: esc = s[end + 1:end + 5] try: m = unichr(int(esc, 16)) if len(esc) != 4 or not esc.isalnum(): raise ValueError except ValueError: raise ValueError(errmsg("Invalid \\uXXXX escape", s, end)) end += 5 _append(m) return u''.join(chunks), end def JSONString(match, context): encoding = getattr(context, 'encoding', None) return scanstring(match.string, match.end(), encoding) pattern(r'"')(JSONString) def JSONObject(match, context, _w=WHITESPACE.match): pairs = {} s = match.string end = _w(s, match.end()).end() nextchar = s[end:end + 1] if nextchar == '}': return pairs, end + 1 if nextchar != '"': raise ValueError(errmsg("Expecting property name", s, end)) end += 1 encoding = getattr(context, 'encoding', None) iterscan = JSONScanner.iterscan while True: key, end = scanstring(s, end, encoding) end = _w(s, end).end() if s[end:end + 1] != ':': raise ValueError(errmsg("Expecting : delimiter", s, end)) end = _w(s, end + 1).end() try: value, end = iterscan(s, idx=end, context=context).next() except StopIteration: raise ValueError(errmsg("Expecting object", s, end)) pairs[key] = value end = _w(s, end).end() nextchar = s[end:end + 1] end += 1 if nextchar == '}': break if nextchar != ',': raise ValueError(errmsg("Expecting , delimiter", s, end - 1)) end = _w(s, end).end() nextchar = s[end:end + 1] end += 1 if nextchar != '"': raise ValueError(errmsg("Expecting property name", s, end - 1)) object_hook = getattr(context, 'object_hook', None) if object_hook is not None: pairs = object_hook(pairs) return pairs, end pattern(r'{')(JSONObject) def JSONArray(match, context, _w=WHITESPACE.match): values = [] s = match.string end = _w(s, match.end()).end() nextchar = s[end:end + 1] if nextchar == ']': return values, end + 1 iterscan = JSONScanner.iterscan while True: try: value, end = iterscan(s, idx=end, context=context).next() except StopIteration: raise ValueError(errmsg("Expecting object", s, end)) values.append(value) end = _w(s, end).end() nextchar = s[end:end + 1] end += 1 if nextchar == ']': break if nextchar != ',': raise ValueError(errmsg("Expecting , delimiter", s, end)) end = _w(s, end).end() return values, end pattern(r'\[')(JSONArray) ANYTHING = [ JSONObject, JSONArray, JSONString, JSONConstant, JSONNumber, ] JSONScanner = Scanner(ANYTHING) class JSONDecoder(object): _scanner = Scanner(ANYTHING) __all__ = ['__init__', 'decode', 'raw_decode'] def __init__(self, encoding=None, object_hook=None): self.encoding = encoding self.object_hook = object_hook def decode(self, s, _w=WHITESPACE.match): obj, end = self.raw_decode(s, idx=_w(s, 0).end()) end = _w(s, end).end() if end != len(s): raise ValueError(errmsg("Extra data", s, end, len(s))) return obj def raw_decode(self, s, **kw): kw.setdefault('context', self) try: obj, end = self._scanner.iterscan(s, **kw).next() except StopIteration: raise ValueError("No JSON object could be decoded") return obj, end class simplejson : def __init__ (self) : pass def dump(self, obj, fp, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, cls=None, indent=None, **kw): if cls is None: cls = JSONEncoder iterable = cls(skipkeys=skipkeys, ensure_ascii=ensure_ascii, check_circular=check_circular, allow_nan=allow_nan, indent=indent, **kw).iterencode(obj) for chunk in iterable: fp.write(chunk) def dumps(self, obj, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, cls=None, indent=None, **kw): if cls is None: cls = JSONEncoder return cls(skipkeys=skipkeys, ensure_ascii=ensure_ascii, check_circular=check_circular, allow_nan=allow_nan, indent=indent, **kw).encode(obj) def load(self, fp, encoding=None, cls=None, object_hook=None, **kw): if cls is None: cls = JSONDecoder if object_hook is not None: kw['object_hook'] = object_hook return cls(encoding=encoding, **kw).decode(fp.read()) def loads(self, s, encoding=None, cls=None, object_hook=None, **kw): if cls is None: cls = JSONDecoder if object_hook is not None: kw['object_hook'] = object_hook return cls(encoding=encoding, **kw).decode(s) # # extlib : s60-simplehttp.py # import httplib import base64 class simplehttp : def __init__ (self, creds=None) : self.__auth__ = None if not creds == None : user = creds[0] pswd = creds[1] self.__auth__ = base64.encodestring(user + ":" + pswd) def get (self, host, path, headers={}) : return self.execute_http_request('GET', host, path, headers) def getstore (self, url, path='') : import md5 import urllib if path == '' : path = md5.md5(url).hexdigest() if not e32.in_emulator() : import sysinfo if 'E:' in sysinfo.free_drivespace().keys() : path = 'e:\\%s' % path else : path = 'c:\\%s' % path try : urllib.urlretrieve(url, unicode(path)) except Exception, e : raise Exception("Failed to retrieve image, %s" % e) return path def post (self, host, path, params=None, headers={}) : return self.execute_http_request('POST', host, path, params) def execute_http_request(self, meth, host, path, params=None, headers={}) : if self.__auth__ : headers['Authorization'] = "Basic %s" % self.__auth__ c = httplib.HTTPConnection(host) c.request(meth, path, params, headers) r = c.getresponse() return r.read() # # extlib : s60-simplestore.py # import e32 class simplestore : def __init__ (self, path) : self.__path__ = path self.__db__ = None self.open() def open (self) : if e32.in_emulator() : import anydbm self.__db__ = anydbm.open(self.__path__, 'c') else : import e32dbm self.__db__ = e32dbm.open(self.__path__, 'c') def close (self) : self.__db__.close() self.__db__ = None def read (self, key) : key = str(key) if self.__db__.has_key(key) : return self.__db__[key] return None def write (self, key, value) : key = str(key) value = str(value) self.__db__[key] = value self.save() def save (self) : self.close() self.open() # # extlib : s60-simplestikkit.py # import urllib class simplestikkit : def __init__ (self, apikey) : self.__key__ = apikey self.__host__ = "api.stikkit.com" def stikkits(self, kw_args={}) : url = "/stikkits.json" json = self.fetch(url, kw_args) return json def stikkit (self, id) : url = '/stikkits/%s.json' % id json = self.fetch(url) return json def create (self, txt) : path = "/stikkits"; params = {'api_key' : self.__key__, 'raw_text':txt} try : http = simplehttp() http.post(self.__host__, path, urllib.urlencode(params)) except Exception, e : raise Exception, e return True def fetch(self, url, kwargs={}) : kwargs['api_key'] = self.__key__; path = '%s?%s' % (url, urllib.urlencode(kwargs)) try : http = simplehttp() json = http.get(self.__host__, path) except Exception, e : raise Exception, e try : sj = simplejson() return sj.loads(json) except Exception, e : raise Exception, e # # extlib : s60-simpleapp.py # import e32 import appuifw class simpleapp : def __init__ (self) : self.__log__ = u"" self.__kill__ = False self.__poll__ = False self.__current__ = u"" self.__activity__ = [] if self.use_lock() : self.__lock__ = e32.Ao_lock() appuifw.app.exit_key_handler = self.abort def abort (self) : self.__kill__ = True self.__lock__.signal() def loop (self) : if self.setup() : self.run() if self.use_lock() : self.__lock__.wait() def setup (self) : return True def run (self) : self.poll() def poll (self) : if self.__poll__ : self.error("Already polling") return False self.__poll__ = True self.do_poll() self.__poll__ = False self.sleep() def do_poll (self) : pass def sleep (self, secs=600) : self.record_activity("Sleeping for %s seconds" % secs) if e32.in_emulator() : return time.sleep(secs) self.poll() else : timer = e32.Ao_timer() timer.after(secs, self.poll) self.__timer__ = timer def use_lock (self) : if e32.in_emulator() : return True return False def log (self, msg, reset=False) : if reset : self.__log__ = unicode("") self.__log__ += "%s\n" % msg self.write(unicode(self.__log__)) def write(self, txt) : if e32.in_emulator() : t = appuifw.Text() t.write(txt) appuifw.app.body = t else : appuifw.app.body = appuifw.Text(txt) def announce (self, msg, type='info') : self.note(msg, type, True) def notice (self, msg) : self.note(msg, 'info') def error (self, msg) : self.note(msg, 'error') def note (self, msg, type, gbl=False) : if not e32.in_emulator and gbl : appuifw.note(unicode(msg), type, gbl) else : appuifw.note(unicode(msg), type) def prompt (self, query, type="text") : if e32.in_emulator() : return raw_input("%s : " % query) return appuifw.query(unicode(query), type) def record_activity (self, msg, display=True) : self.__activity__.append(unicode("* %s" % msg)) if display : self.display_activity() def display_activity (self) : msg = "\n".join(self.__activity__) reset = True self.log(msg, reset) def switch_to_app (self) : self.display_app() self.setup_app_menu() def display_app (self) : reset = True self.log(self.__current__, reset) def switch_to_activity (self) : self.display_activity() self.setup_activity_menu() def setup_activity_menu (self) : pass def setup_app_menu (self) : pass def setup_simplestore(self, storename) : storepath = storename if not e32.in_emulator() : import sysinfo if 'E:' in sysinfo.free_drivespace().keys() : storepath = 'e:\\%s' % storename else : storepath = 'c:\\%s' % storename try : self.__store__ = simplestore(storepath) except Exception, e : self.error("Failed to open local datastore, %s" % e) return False return True def setup_access_point (self) : if e32.in_emulator() : return True import socket try : apid = socket.select_access_point() apnt = socket.access_point(apid) socket.set_default_access_point(apnt) except Exception, e : self.error("Failed to setup access point : %s" % e) return False return True def reset_access_point (self) : if self.setup_access_point() : self.notice("Your access point has been reset") def vibrate (self) : try : import misty misty.vibrate(500, 100) except Exception, e : return False # python ~/lib/python/s60-tools/s60-tools.py -f generators -c buildify -p ~/lib/python -i ./s60-stikkitapp.py > ./s60-stikkitapp_b.py # /usr/local/bin/ensymble.py py2sis --caps="LocalServices+NetworkServices+ReadUserData+WriteUserData+UserEnvironment" --appname='Stikkit' --version='0.1' --verbose ./s60-stikkitapp_b.py ./Stikkit.SIS class stikkitapp (simpleapp) : def __init__ (self) : simpleapp.__init__(self) self.__auth__ = None self.__stikkit__ = None self.__store__ = None self.__list__ = {} self.__items__ = {} self.__post__ = '' # # # def setup (self) : storename = 's60_stikkitapp' if not self.setup_simplestore(storename) : return False # if not self.setup_access_point() : return False # try : self.setup_stikkit() except Exception, e : self.error("Failed to create Stikkit object : %s" % e) return False self.setup_menu() self.record_activity("Setup complete") return True # # # def setup_stikkit (self) : keys = [('api_key', u"your stikkit api key")] auth = self.setup_auth(keys) self.__auth__ = auth if not auth : raise Exception, "Failed to retrieve API key" self.__stikkit__ = simplestikkit(auth['api_key']) return True # # Please to put me in s60-simpleauth.py # def setup_auth (self, keys, force=False) : auth = {} for pair in keys : key = pair[0] label = pair[1] if force or not self.__store__.read(key) : value = self.prompt(label) if value == None : return None self.__store__.write(key, value) auth[key] = self.__store__.read(key) return auth # # # def setup_menu (self) : items = self.menu_items() appuifw.app.menu = items # # # def menu_items (self) : items =[] items.append((u"Show Stikkit", self.show_stikkit)) items.append((u"Create Stikkit", self.create_stikkit)) items.append((u"Refresh Stikkit List", self.refresh_stikkits)) # reset auth items.append((u"Reset network connection", self.reset_access_point)) return items # # # def save_hook (self, state) : self.__post__ = state[0][2] return True # # # def create_stikkit (self) : self.__post__ = '' params = [(u"body", "text")] form = appuifw.Form(params, appuifw.FFormEditModeOnly) form.save_hook = self.save_hook form.execute() if self.__post__ == '' : return self.record_activity("posting to stikkit") try : self.__stikkit__.create(self.__post__) except Exception, e : self.error("Failed to create post : %s" % e) return False self.record_activity("your stikkit has been stukked") self.refresh_stikkits() # # # def show_stikkit (self) : id = self.choose_stikkit() if not id : return self.record_activity("retrieving stikkit id %s" % id) st = self.fetch_stikkit(id) self.record_activity("stikkit post retrieved") self.log(st['text'], True) # # # def choose_stikkit (self) : list = self.fetch_stikkits() if not list : return None choices = [] ids = {} i = 0 for st in list : choices.append(st['name']) ids[i] = st['id'] i += 1 idx = appuifw.popup_menu(choices) if idx == None : return None return ids[idx] # # # def refresh_stikkits (self) : self.__list__ = {} self.__items__ = {} self.fetch_stikkits() # # # def fetch_stikkits (self) : if len(self.__list__) == 0 : self.record_activity("retrieving stikkit list") list = self.__stikkit__.stikkits() if not list : return None self.__list__ = list self.record_activity("list retrieved") return self.__list__ # # # def fetch_stikkit (self, id) : if not self.__items__.has_key(id) : st = self.__stikkit__.stikkit(id) self.__items__[id] = st return self.__items__[id] # # # if __name__ == "__main__" : app = stikkitapp() app.loop()