import unittest import functools import asyncio from grpc import RpcError from aioetcd3.client import client from aioetcd3.help import range_all, range_prefix, PER_RW from aioetcd3.watch import EVENT_TYPE_CREATE,EVENT_TYPE_DELETE,EVENT_TYPE_MODIFY,\ CompactRevisonException, WatchException from .utils import switch_auth_off, switch_auth_on def asynctest(f): @functools.wraps(f) def _f(self): return asyncio.get_event_loop().run_until_complete(f(self)) return _f class WatchTest(unittest.TestCase): @asynctest async def setUp(self): self.endpoints = "127.0.0.1:2379" self.client = client(endpoint=self.endpoints) await self.cleanUp() async def common_watch1(self): f1 = asyncio.get_event_loop().create_future() async def watch_1(): i = 0 async with self.client.watch_scope('/foo') as response: f1.set_result(None) async for event in response: i = i + 1 if i == 1: self.assertEqual(event.type, EVENT_TYPE_CREATE) self.assertEqual(event.key, b'/foo') self.assertEqual(event.value, b'foo') elif i == 2: self.assertEqual(event.type, EVENT_TYPE_MODIFY) self.assertEqual(event.key, b'/foo') self.assertEqual(event.value, b'foo1') elif i == 3: self.assertEqual(event.type, EVENT_TYPE_DELETE) self.assertEqual(event.key, b'/foo') # delete event has no value # self.assertEqual(event.value, b'foo1') break f2 = asyncio.get_event_loop().create_future() async def watch_2(): i = 0 async for event in self.client.watch('/foo', prev_kv=True, create_event=True): if event is None: f2.set_result(None) continue i = i + 1 if i == 1: self.assertEqual(event.type, EVENT_TYPE_CREATE) self.assertEqual(event.key, b'/foo') self.assertEqual(event.value, b'foo') elif i == 2: self.assertEqual(event.type, EVENT_TYPE_MODIFY) self.assertEqual(event.key, b'/foo') self.assertEqual(event.value, b'foo1') self.assertEqual(event.pre_value, b'foo') elif i == 3: self.assertEqual(event.type, EVENT_TYPE_DELETE) self.assertEqual(event.key, b'/foo') # self.assertEqual(event.value, b'foo1') break f3 = asyncio.get_event_loop().create_future() async def watch_3(): i = 0 async for event in self.client.watch('/foo', prev_kv=True, noput=True, create_event=True): if event is None: f3.set_result(None) continue i = i + 1 if i == 1: self.assertEqual(event.type, EVENT_TYPE_DELETE) self.assertEqual(event.key, b'/foo') # self.assertEqual(event.value, b'foo1') break f4 = asyncio.get_event_loop().create_future() async def watch_4(): i = 0 async for event in self.client.watch('/foo', prev_kv=True, nodelete=True, create_event=True): if event is None: f4.set_result(None) continue i = i + 1 if i == 1: self.assertEqual(event.type, EVENT_TYPE_CREATE) self.assertEqual(event.key, b'/foo') self.assertEqual(event.value, b'foo') elif i == 2: self.assertEqual(event.type, EVENT_TYPE_MODIFY) self.assertEqual(event.key, b'/foo') self.assertEqual(event.value, b'foo1') self.assertEqual(event.pre_value, b'foo') break w1 = asyncio.ensure_future(watch_1()) w2 = asyncio.ensure_future(watch_2()) w3 = asyncio.ensure_future(watch_3()) w4 = asyncio.ensure_future(watch_4()) await asyncio.wait_for(asyncio.wait([f1, f2, f3, f4]), 2) await self.client.put('/foo', 'foo') await self.client.put('/foo', 'foo1') await self.client.delete('/foo') done, pending = await asyncio.wait([w1, w2, w3, w4], timeout=20) for t in done: t.result() @asynctest async def test_watch_1(self): await self.common_watch1() async def watch_reconnect(self): f1 = asyncio.get_event_loop().create_future() f2 = asyncio.get_event_loop().create_future() async def watch_1(): i = 0 async with self.client.watch_scope('/foo') as response: f1.set_result(None) async for event in response: i = i + 1 if i == 1: self.assertEqual(event.type, EVENT_TYPE_CREATE) self.assertEqual(event.key, b'/foo') self.assertEqual(event.value, b'foo') f2.set_result(None) elif i == 2: self.assertEqual(event.type, EVENT_TYPE_MODIFY) self.assertEqual(event.key, b'/foo') self.assertEqual(event.value, b'foo1') elif i == 3: self.assertEqual(event.type, EVENT_TYPE_DELETE) self.assertEqual(event.key, b'/foo') # delete event has no value # self.assertEqual(event.value, b'foo1') break t1 = asyncio.ensure_future(watch_1()) await f1 await self.client.put('/foo', 'foo') await f2 self.client.update_server_list(self.endpoints) await self.client.put('/foo', 'foo1') await self.client.delete('/foo') await t1 @asynctest async def test_watch_reconnect(self): await self.watch_reconnect() async def watch_create_cancel(self): async def watch_1(): async with self.client.watch_scope('/foo') as _: pass async def watch_2(): async with self.client.watch_scope('/foo') as _: await asyncio.sleep(5) for _ in range(0, 5): watches = [asyncio.ensure_future(watch_1() if i % 2 else watch_2()) for i in range(0, 200)] await asyncio.sleep(1) for w in watches[::3]: w.cancel() self.client.update_server_list(self.endpoints) await asyncio.sleep(0.01) for w in watches[1::3]: w.cancel() await asyncio.sleep(0.3) for w in watches[2::3]: w.cancel() await asyncio.wait_for(asyncio.wait(watches), 3) results = await asyncio.gather(*watches, return_exceptions=True) print("Finished:", len([r for r in results if r is None]), "Cancelled:", len([r for r in results if r is not None])) self.assertIsNotNone(self.client._watch_task_running) await asyncio.sleep(3) self.assertIsNone(self.client._watch_task_running) @asynctest async def test_watch_create_cancel(self): await self.watch_create_cancel() async def batch_events(self): f1 = asyncio.get_event_loop().create_future() f2 = asyncio.get_event_loop().create_future() def _check_event(e, criterias): if criterias[0]: self.assertEqual(e.type, criterias[0]) if criterias[1]: self.assertEqual(e.key, criterias[1]) if criterias[2]: self.assertEqual(e.value, criterias[2]) async def watch_1(): asserts = [(EVENT_TYPE_CREATE, b'/foo/1', b'1'), (EVENT_TYPE_CREATE, b'/foo/2', b'2'), (EVENT_TYPE_MODIFY, b'/foo/1', b'2'), (EVENT_TYPE_MODIFY, b'/foo/2', b'3'), (EVENT_TYPE_DELETE, b'/foo/1', None), (EVENT_TYPE_DELETE, b'/foo/2', None)] async with self.client.watch_scope(range_prefix('/foo/')) as response: f1.set_result(None) async for e in response: _check_event(e, asserts.pop(0)) if not asserts: break async def watch_2(): asserts = [((EVENT_TYPE_CREATE, b'/foo/1', b'1'), (EVENT_TYPE_CREATE, b'/foo/2', b'2'),), ((EVENT_TYPE_MODIFY, b'/foo/1', b'2'),), ((EVENT_TYPE_MODIFY, b'/foo/2', b'3'),), ((EVENT_TYPE_DELETE, b'/foo/1', None), (EVENT_TYPE_DELETE, b'/foo/2', None))] async with self.client.watch_scope(range_prefix('/foo/'), batch_events=True) \ as response: f2.set_result(None) async for es in response: batch = asserts.pop(0) self.assertEqual(len(es), len(batch)) for e, a in zip(es, batch): _check_event(e, a) if not asserts: break t1 = asyncio.ensure_future(watch_1()) t2 = asyncio.ensure_future(watch_2()) await asyncio.wait_for(asyncio.wait([f1, f2]), 2) self.assertTrue((await self.client.txn([], [self.client.put.txn('/foo/1', '1'), self.client.put.txn('/foo/2', '2')], []))[0]) await self.client.put('/foo/1', '2') await self.client.put('/foo/2', '3') self.assertTrue((await self.client.txn([], [self.client.delete.txn('/foo/1'), self.client.delete.txn('/foo/2')], []))[0]) await asyncio.gather(t1, t2) @asynctest async def test_batch_events(self): await self.batch_events() async def compact_revision(self): await self.client.put('/foo', '1') first_revision = self.client.last_response_info.revision await self.client.put('/foo', '2') await self.client.put('/foo', '3') await self.client.put('/foo', '4') await self.client.put('/foo', '5') compact_revision = self.client.last_response_info.revision await self.client.compact(compact_revision, True) async def watch_1(): async with self.client.watch_scope('/foo', start_revision=first_revision) as response: with self.assertRaises(CompactRevisonException) as cm: async for e in response: raise ValueError("Not raised") self.assertEqual(cm.exception.revision, compact_revision) async def watch_2(): async with self.client.watch_scope('/foo', ignore_compact=True, start_revision=first_revision) as responses: async for e in responses: self.assertEqual(e.type, EVENT_TYPE_MODIFY) self.assertEqual(e.key, b'/foo') self.assertEqual(e.value, b'5') self.assertEqual(e.revision, compact_revision) break await watch_1() await watch_2() @asynctest async def test_compact_revision(self): await self.compact_revision() async def watch_exception(self): f1 = asyncio.get_event_loop().create_future() f2 = asyncio.get_event_loop().create_future() async def watch_1(): i = 0 async with self.client.watch_scope('/foo') as response: f1.set_result(None) with self.assertRaises(WatchException): async for event in response: i = i + 1 if i == 1: self.assertEqual(event.type, EVENT_TYPE_CREATE) self.assertEqual(event.key, b'/foo') self.assertEqual(event.value, b'foo') f2.set_result(None) elif i == 2: raise ValueError("Not raised") f3 = asyncio.get_event_loop().create_future() f4 = asyncio.get_event_loop().create_future() async def watch_2(): i = 0 async with self.client.watch_scope('/foo', always_reconnect=True) as response: f3.set_result(None) async for event in response: i = i + 1 if i == 1: self.assertEqual(event.type, EVENT_TYPE_CREATE) self.assertEqual(event.key, b'/foo') self.assertEqual(event.value, b'foo') f4.set_result(None) elif i == 2: self.assertEqual(event.type, EVENT_TYPE_MODIFY) self.assertEqual(event.key, b'/foo') self.assertEqual(event.value, b'foo1') elif i == 3: self.assertEqual(event.type, EVENT_TYPE_DELETE) self.assertEqual(event.key, b'/foo') # delete event has no value # self.assertEqual(event.value, b'foo1') break t1 = asyncio.ensure_future(watch_1()) t2 = asyncio.ensure_future(watch_2()) await f1 await f3 await self.client.put('/foo', 'foo') await f2 await f4 fake_endpoints = 'ipv4:///127.0.0.1:49999' self.client.update_server_list(fake_endpoints) await asyncio.sleep(2) self.client.update_server_list(self.endpoints) await self.client.put('/foo', 'foo1') await self.client.delete('/foo') await t1 await t2 @asynctest async def test_watch_exception(self): await self.watch_exception() async def _run_test_with_auth(self, test): default_client = self.client await switch_auth_on(default_client) root_client = client(endpoint=self.endpoints, username="root", password="root") await root_client.role_grant_permission(name='client', key_range=range_prefix('/foo'), permission=PER_RW) self.client = client(endpoint=self.endpoints, username="client", password="client") try: await test() finally: await switch_auth_off( root_client, default_client ) await root_client.close() await self.client.close() self.client = default_client @asynctest async def test_watch1_with_auth(self): await self._run_test_with_auth(self.common_watch1) @asynctest async def test_watch_reconnect_with_auth(self): await self._run_test_with_auth(self.watch_reconnect) @asynctest async def test_watch_create_cancel_with_auth(self): await self._run_test_with_auth(self.watch_create_cancel) @asynctest async def test_batch_events_with_auth(self): await self._run_test_with_auth(self.batch_events) @asynctest async def test_compact_revision_with_auth(self): await self._run_test_with_auth(self.compact_revision) @asynctest async def test_watch_exception_with_auth(self): await self._run_test_with_auth(self.watch_exception) @asynctest async def tearDown(self): await self.cleanUp() await self.client.close() async def cleanUp(self): await self.client.delete(range_all()) if __name__ == '__main__': unittest.main()