Skip to content

Commit dac5b88

Browse files
committed
broker.py: Validate subscriptions.
1 parent 596e463 commit dac5b88

File tree

3 files changed

+53
-27
lines changed

3 files changed

+53
-27
lines changed

v3/docs/DRIVERS.md

+28-23
Original file line numberDiff line numberDiff line change
@@ -1173,14 +1173,10 @@ The `topic` arg is typically a string but may be any hashable object. A
11731173

11741174
#### Agent types
11751175

1176-
An `agent` may be any of the following:
1177-
1178-
* `Queue` When a message is received it receives 2-tuple `(topic, message)`. If
1179-
extra args were passed on subscription the queue receives a 3-tuple.
1180-
`(topic, message, (args...))`.
1181-
* `RingbufQueue` When a message is received it receives 2-tuple `(topic, message)`.
1182-
If extra args were passed on subscription it receives a 3-tuple,
1183-
`(topic, message, (args...))`.
1176+
An `agent` may be an instance of any of the following:
1177+
1178+
* `RingbufQueue` Received messages are queued as a 2-tuple `(topic, message)`.
1179+
* `Queue` Received messages are queued as a 2-tuple `(topic, message)`.
11841180
* `function` Called when a message is received. Args: topic, message plus any
11851181
further args.
11861182
* `bound method` Called when a message is received. Args: topic, message plus any
@@ -1193,7 +1189,8 @@ message plus any further args.
11931189
* `Event` Set when a message is received.
11941190

11951191
Note that synchronous `agent` instances must run to completion quickly otherwise
1196-
the `publish` method will be slowed.
1192+
the `publish` method will be slowed. See [Notes](./DRIVERS.md#93-notes) for
1193+
further details on queue behaviour.
11971194

11981195
#### Broker class variable
11991196

@@ -1202,24 +1199,25 @@ the `publish` method will be slowed.
12021199
#### example
12031200
```py
12041201
import asyncio
1205-
from primitives import Broker, Queue
1202+
from primitives import Broker, RingbufQueue
12061203

12071204
broker = Broker()
1208-
queue = Queue() # Or (e.g. RingbufQueue(20))
1205+
queue = RingbufQueue(20)
12091206
async def sender(t):
12101207
for x in range(t):
12111208
await asyncio.sleep(1)
12121209
broker.publish("foo_topic", f"test {x}")
12131210

1211+
async def receiver():
1212+
async for topic, message in queue:
1213+
print(topic, message)
1214+
12141215
async def main():
12151216
broker.subscribe("foo_topic", queue)
1216-
n = 10
1217-
asyncio.create_task(sender(n))
1218-
print("Letting queue part-fill")
1219-
await asyncio.sleep(5)
1220-
for _ in range(n):
1221-
topic, message = await queue.get()
1222-
print(topic, message)
1217+
rx = asyncio.create_task(receiver())
1218+
await sender(10)
1219+
await asyncio.sleep(2)
1220+
rx.cancel()
12231221

12241222
asyncio.run(main())
12251223
```
@@ -1236,7 +1234,8 @@ async def messages(client):
12361234
Assuming the MQTT client is subscribed to multiple topics, message strings are
12371235
directed to individual tasks each supporting one topic.
12381236

1239-
The following illustrates a use case for `agent` args.
1237+
The following illustrates a use case for passing args to an `agent` (pin nos.
1238+
are for Pyoard 1.1).
12401239
```py
12411240
import asyncio
12421241
from primitives import Broker
@@ -1319,17 +1318,23 @@ If a message causes a queue to fill, a message will silently be lost. It is the
13191318
responsibility of the subscriber to avoid this. In the case of a `Queue`
13201319
instance the lost message is the one causing the overflow. In the case of
13211320
`RingbufQueue` the oldest message in the queue is discarded. In some
1322-
applications this behaviour is preferable.
1321+
applications this behaviour is preferable. In general `RingbufQueue` is
1322+
preferred as it is optimised for microcontroller use and supports retrieval by
1323+
an asynchronous iterator.
1324+
1325+
If either queue type is subscribed with args, publications will queued as a
1326+
3-tuple `(topic, message, (args...))`. There is no obvious use case for this.
13231327

13241328
#### exceptions
13251329

13261330
An instance of an `agent` objects is owned by a subscribing tasks but is
13271331
executed by a publishing task. If a function used as an `agent` throws an
13281332
exception, the traceback will point to a `Broker.publish` call.
13291333

1330-
The `Broker` class does not throw exceptions. There are a number of non-fatal
1331-
conditions which can occur such as a queue overflow or an attempt to unsubscribe
1332-
an `agent` twice. The `Broker` will report these if `Broker.Verboase=True`.
1334+
The `Broker` class throws a `ValueError` if `.subscribe` is called with an
1335+
invalid `agent` type. There are a number of non-fatal conditions which can occur
1336+
such as a queue overflow or an attempt to unsubscribe an `agent` twice. The
1337+
`Broker` will report these if `Broker.Verbose=True`.
13331338

13341339
###### [Contents](./DRIVERS.md#0-contents)
13351340

v3/primitives/broker.py

+13-1
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,22 @@ class Agent:
1414
pass
1515

1616

17+
def _validate(a):
18+
return (
19+
isinstance(a, asyncio.Event)
20+
or isinstance(a, Queue)
21+
or isinstance(a, RingbufQueue)
22+
or isinstance(a, Agent)
23+
or callable(a)
24+
)
25+
26+
1727
class Broker(dict):
1828
Verbose = True
1929

2030
def subscribe(self, topic, agent, *args):
31+
if not _validate(agent):
32+
raise ValueError("Invalid agent:", agent)
2133
aa = (agent, args)
2234
if not (t := self.get(topic, False)):
2335
self[topic] = {aa}
@@ -51,7 +63,7 @@ def publish(self, topic, message):
5163
try:
5264
agent.put_nowait(t if args else t[:2])
5365
except Exception: # Queue discards current message. RingbufQueue discards oldest
54-
Broker.verbose and print(f"Message lost topic {topic} message {message}")
66+
Broker.Verbose and print(f"Message lost topic {topic} message {message}")
5567
continue
5668
# agent is function, method, coroutine or bound coroutine
5769
res = agent(topic, message, *args)

v3/primitives/tests/broker_test.py

+12-3
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ async def print_ringbuf_q(q):
5959

6060

6161
async def main():
62+
Broker.Verbose = False # Suppress q full messages
6263
tc = TestClass()
6364
q = Queue(10)
6465
rq = RingbufQueue(10)
@@ -96,8 +97,6 @@ async def main():
9697
print()
9798
print("Unsubscribing method")
9899
broker.unsubscribe("foo_topic", tc.get_data) # Async method
99-
# print("Pause 5s")
100-
# await asyncio.sleep(5)
101100
print("Retrieving foo_topic messages from Queue")
102101
try:
103102
await asyncio.wait_for(print_queue(q), 5)
@@ -108,9 +107,19 @@ async def main():
108107
await asyncio.wait_for(print_ringbuf_q(rq), 5)
109108
except asyncio.TimeoutError:
110109
print("Timeout")
111-
print("Check error on invalid unsubscribe")
110+
print()
111+
print("*** Testing error reports and exception ***")
112+
print()
113+
Broker.Verbose = True
114+
print("*** Check error on invalid unsubscribe ***")
112115
broker.unsubscribe("rats", "more rats") # Invalid topic
113116
broker.unsubscribe("foo_topic", "rats") # Invalid agent
117+
print("*** Check exception on invalid subscribe ***")
118+
try:
119+
broker.subscribe("foo_topic", "rubbish_agent")
120+
print("Test FAIL")
121+
except ValueError:
122+
print("Test PASS")
114123

115124

116125
asyncio.run(main())

0 commit comments

Comments
 (0)