GEOFFREY v0.1.2 Documentation

geoffrey.hub

Contents

Source code for geoffrey.hub

"""
The EventHUB and related functions..

"""
#pylint: disable=I0011, E1101, W0212, W0603, C0103
from asyncio.queues import QueueFull
import asyncio
import logging
import pickle

from geoffrey.data import State, Event, EventType

# Global, implicit hub
_hub = None

logger = logging.getLogger(__name__)


[docs]class EventHUB: """The main data exchanger.""" instance = None def __init__(self): self.events = asyncio.Queue() self.subscriptions = [] self.running = False self.states = {}
[docs] def add_subscriptions(self, subscriptions): """Extend the subscription list with the given `subscriptions`.""" self.subscriptions.extend(subscriptions)
[docs] def get_states(self, key_criteria): """Generator with the matching states of the key_criteria.""" kc = key_criteria._asdict().items() for stk, stv in self.states.items(): # exv: Expected Value == None -means I don't care- if all(exv is None or getattr(stk, exk) == exv for exk, exv in kc): yield State.from_keyvalue(stk, stv)
[docs] def get_one_state(self, key_criteria): """Like get_states but return the first matching state or None.""" try: return next(self.get_states(key_criteria)) except StopIteration: return None
@asyncio.coroutine
[docs] def run(self): """Infite loop that send events to the subscribers.""" logger.debug("Starting EventHUB!") if not self.running: self.running = True else: raise RuntimeError("HUB run method can't be exec twice.") while True: data = yield from self.events.get() logger.debug("Sending %s to %d subscriptions", data, len(self.subscriptions)) for subscription in self.subscriptions: try: subscription.put_nowait(data) except QueueFull: pass
def _process_state(self, state, force_change): """ Process the given `state` and produce events according to the rules. """ logger.debug("State received: %s", state) key, value = state.to_keyvalue() if key in self.states: # Key already exists. if value: if force_change or value != self.states[key]: # Modified value self.states[key] = value event = state.to_event(EventType.modified) return (False, event) else: # Same value. # (It's covered but coverage does not detect it.) return (None, None) # pragma: nocover else: # No value means deletion. del self.states[key] event = Event.from_keyvalue(key, None, type=EventType.deleted) return (False, event) elif value: # New value. Creation self.states[key] = value event = state.to_event(type=EventType.created) return (False, event) else: # No value means. Deletion. But is an unknown key. # (It's covered but coverage does not detect it.) return (None, None) # pragma: nocover def _process_data(self, data, force_change): """ Process the data received by the hub. Return a tuple (isfinal, event) => (bool, Event). """ if isinstance(data, Event): return (True, data) elif isinstance(data, State): return self._process_state(data, force_change=force_change) else: raise TypeError("Unknown data type.") @asyncio.coroutine
[docs] def put(self, data, force_change=False): """ Put a state or event in the hub. This method is a coroutine, use:: yield from hub.put(data) """ isfinal, event = self._process_data(data, force_change=force_change) if event is not None: if isfinal: yield from self.events.put(event) else: yield from self.put(event, force_change=force_change)
[docs] def put_nowait(self, data, force_change=False): """ Put a state or event in the hub. This method is NOT a coroutine, use:: hub.put_nowait(data) """ isfinal, event = self._process_data(data, force_change=force_change) if event is not None: if isfinal: self.events.put_nowait(event) else: self.put_nowait(event, force_change=force_change)
[docs] def save_states(self, filename): """Save the state table to disk.""" with open(filename, 'wb') as outputfile: pickle.dump(self.states, outputfile)
[docs] def restore_states(self, filename): """Load the state table from disk.""" with open(filename, 'rb') as inputfile: self.states = pickle.load(inputfile)
def get_hub(): """Return the global event hub.""" global _hub if _hub is None: _hub = EventHUB() return _hub

Contents