from twisted.web2 import resource, http, http_headers, stream import random import string import cjson import time encodejson = cjson.encode decodejson = cjson.decode HTTP_ERROR = 406 class Utils: usedIds = set() #although highly unlikely, no collisions usedIds.add(None) @staticmethod def makeId(chars=32): id = None while id in Utils.usedIds: id = ''.join(random.sample(string.letters+string.digits, chars)) Utils.usedIds.add(id) return id @staticmethod def getTime(): return time.strftime("%Y-%m-%dT%H:%M:%S.00", time.gmtime()) class Connection(object): supported_connection_types = ["callback-polling", "long-polling"] def __init__(self): self.n_connections = 0 self.streams = [] #make sure these are with (stream, jsoncallbac) self.message_queue = [] #array of arrays def addStream(self, message, args, addMessage = None): jsonp = "" if jsonp in args: if args["jsonp"] is not None: jsonp = args["jsonp"] else: jsonp = "jsonp" newstream = stream.ProducerStream() self.streams.append((newstream,jsonp)) amsg = [] if len(self.streams) <= len(self.message_queue): amsg = self.message_queue.pop() if addMessage: amsg.insert(0,addMessage) self.message_queue.append(amsg) print "Created Stream!!" return makeResponse(newstream) def connect(self, message, args): print "message connect %s" % message msg = { "channel": "/meta/connect", "successful":True, "error": None, "clientId": message["clientId"], "advice": {"reconnect": "retry"} } str = self.addStream(message, args, msg) if len(self.message_queue[0]) > 1: self.deliver() return str def flushStreams(self): while len(self.streams): self.deliver() def deliver(self, message = None): #FIXME we might need handling for 0 len messages if message: if len(self.message_queue) == 0: self.message_queue.append([message]) else: self.message_queue[0].append(message) if len(self.streams) and len(self.message_queue[0]): stream,jsoncallback = self.streams.pop(0) print "delivering %s" % ( "%s(%s)" % (jsoncallback, encodejson(self.message_queue[0]))) stream.write( "%s(%s)" % (jsoncallback, encodejson(self.message_queue.pop(0)))) stream.finish() del stream else: "might be delivering eventually %s" % message class ClientSubscription(object): def __init__( self, channel, sendMessageCallback ): self.channel = channel self.sendMessageCallback = sendMessageCallback #Make sure we set this!!! def messageReceived(self, message): self.sendMessage(message) #Make sure you call this first if you override this message!!! def sendMessage(self, message): self.sendMessageCallback(message, self.channel) #Make sure you call this first if you override this message!!! def subscribe(self): pass #when we get disconnected or unsubscribed def unsubscribe(self): pass class Client(object): def __init__(self, id=None, ClientSubscriptionType = ClientSubscription): self.ClientSubscriptionType = ClientSubscriptionType self.connection = Connection() self.subscriptions = {} if id: self.id = id else: self.id = Utils.makeId() def connect(self,msg,args): return self.connection.connect(msg, args) def subscribe(self,msg,args): channel = msg["subscription"] if channel not in self.subscriptions: self.subscriptions[channel] = \ self.ClientSubscriptionType(channel,self.sendMessage) self.subscriptions[channel].subscribe() nmsg = { "channel": "/meta/subscribe", "subscription":channel, "successful": True } return self._subunsubscribe(msg, args, nmsg) else: return {"error":"already subscribed"} def unsubscribe(self,msg,args): channel = msg["subscription"] if channel in self.subscriptions: self.subscriptions[channel].unsubscribe() del self.subscriptions[channel] nmsg = { "channel": "/meta/unsubscribe", "subscription":channel, "successful": True } return self._subunsubscribe(msg, args, nmsg) else: return {"error":"not even subscribed"} def dispatch(self,msg,args): channel = msg["channel"] data = msg["data"] if channel in self.subscriptions: #conn = self.connection.addStream(msg, args) self.subscriptions[channel].messageReceived(data) return {"successful":True,"channel":channel} else: return {"error":"not subscribed to %s" % channel} def _subunsubscribe(self,msg,args,nmsg): self.connection.flushStreams() #stream = self.connection.addStream(msg, args,nmsg) #self.connection.deliver() return nmsg def sendMessage(self, message, channel): if self.connection is not None: self.connection.deliver({"data":message, "channel":channel, "clientId":self.id}) else: #FIXME make own exception raise Exception("No connection? :(") def makeResponse(stream, type="text/javascript", headers={}, code=200): newHeaders = http_headers.Headers() newHeaders.addRawHeader("Content-type", type) for (k,v) in headers.items(): newHeaders.addRawHeader(k, v) print "RESPONSE STREAM: %s" % stream return http.Response(code=code, headers=newHeaders, stream=stream) class ucomet(resource.PostableResource): version = 1.0 min_version = 1.0 def __init__(self, ClientType = Client, ClientSubscriptionType = ClientSubscription): self.clients = {} self.ClientType = ClientType #we will propogate this to the clients self.ClientSubscriptionType = ClientSubscriptionType def render(self, rcx): args = rcx.args messages = None if "message" in args: try: messages = decodejson( args["message"][0] ) except cjson.DecodeError: return makeResponse("Message must be in proper JSON", type="text/plain", code=HTTP_ERROR) else: return makeResponse("There's no message argument", type="text/plain", code=HTTP_ERROR) if type(messages) is not list: return makeResponse("Messages must be encapsulated in array", type="text/plain", code=HTTP_ERROR) print "messages %s" % messages stream = None responses = [] connection = None for m in messages: clientId = None client = None #Let's see if we already have a client if "clientId" in m: clientId = m["clientId"] if clientId in self.clients: client = self.clients[clientId] connection = client.connection #it still may be None if type(m) is not dict: responses.append({"error":"each message must be in dict format"}) print("NOT DICT") continue if "channel" not in m: responses.append({"error":"channel is required in message"}) print("NO CHANNEL") continue channel = m["channel"] if channel == "/meta/handshake": resp = self.handshake(m,args) return resp if channel == "/meta/connect" or \ channel == "/meta/reconnect": print "M %s" %m stream = client.connect(m,args) elif channel == "/meta/subscribe": responses.append(client.subscribe(m,args)) elif channel == "/meta/unsubscribe": responses.append(client.unsubscribe(m,args)) else: responses.append(client.dispatch(m,args)) if not stream or not client: return makeResponse(encodejson(responses)) else: for r in responses: connection.deliver(r) return stream def handshake(self, message, args): #FIXME add error handlign if we're already hand shaken client = self.ClientType(ClientSubscriptionType=self.ClientSubscriptionType) self.clients[client.id] = client return makeResponse(encodejson([{ "channel" :"/meta/handshake", "version" :self.version, "minimumVersion" :self.min_version, "supportedConnectionTypes":Connection.supported_connection_types, "clientId" :client.id, "successful" :True, "error" :None, "id" :message["id"] }]))