aioetcd3/test/test_watch.py

411 lines
16 KiB
Python

import unittest
import functools
import asyncio
from grpc import RpcError
from aioetcd3.client import client
from aioetcd3.help import range_all, range_prefix, PER_RW
from aioetcd3.watch import EVENT_TYPE_CREATE,EVENT_TYPE_DELETE,EVENT_TYPE_MODIFY,\
CompactRevisonException, WatchException
from .utils import switch_auth_off, switch_auth_on
def asynctest(f):
@functools.wraps(f)
def _f(self):
return asyncio.get_event_loop().run_until_complete(f(self))
return _f
class WatchTest(unittest.TestCase):
@asynctest
async def setUp(self):
self.endpoints = "127.0.0.1:2379"
self.client = client(endpoint=self.endpoints)
await self.cleanUp()
async def common_watch1(self):
f1 = asyncio.get_event_loop().create_future()
async def watch_1():
i = 0
async with self.client.watch_scope('/foo') as response:
f1.set_result(None)
async for event in response:
i = i + 1
if i == 1:
self.assertEqual(event.type, EVENT_TYPE_CREATE)
self.assertEqual(event.key, b'/foo')
self.assertEqual(event.value, b'foo')
elif i == 2:
self.assertEqual(event.type, EVENT_TYPE_MODIFY)
self.assertEqual(event.key, b'/foo')
self.assertEqual(event.value, b'foo1')
elif i == 3:
self.assertEqual(event.type, EVENT_TYPE_DELETE)
self.assertEqual(event.key, b'/foo')
# delete event has no value
# self.assertEqual(event.value, b'foo1')
break
f2 = asyncio.get_event_loop().create_future()
async def watch_2():
i = 0
async for event in self.client.watch('/foo', prev_kv=True, create_event=True):
if event is None:
f2.set_result(None)
continue
i = i + 1
if i == 1:
self.assertEqual(event.type, EVENT_TYPE_CREATE)
self.assertEqual(event.key, b'/foo')
self.assertEqual(event.value, b'foo')
elif i == 2:
self.assertEqual(event.type, EVENT_TYPE_MODIFY)
self.assertEqual(event.key, b'/foo')
self.assertEqual(event.value, b'foo1')
self.assertEqual(event.pre_value, b'foo')
elif i == 3:
self.assertEqual(event.type, EVENT_TYPE_DELETE)
self.assertEqual(event.key, b'/foo')
# self.assertEqual(event.value, b'foo1')
break
f3 = asyncio.get_event_loop().create_future()
async def watch_3():
i = 0
async for event in self.client.watch('/foo', prev_kv=True, noput=True, create_event=True):
if event is None:
f3.set_result(None)
continue
i = i + 1
if i == 1:
self.assertEqual(event.type, EVENT_TYPE_DELETE)
self.assertEqual(event.key, b'/foo')
# self.assertEqual(event.value, b'foo1')
break
f4 = asyncio.get_event_loop().create_future()
async def watch_4():
i = 0
async for event in self.client.watch('/foo', prev_kv=True, nodelete=True, create_event=True):
if event is None:
f4.set_result(None)
continue
i = i + 1
if i == 1:
self.assertEqual(event.type, EVENT_TYPE_CREATE)
self.assertEqual(event.key, b'/foo')
self.assertEqual(event.value, b'foo')
elif i == 2:
self.assertEqual(event.type, EVENT_TYPE_MODIFY)
self.assertEqual(event.key, b'/foo')
self.assertEqual(event.value, b'foo1')
self.assertEqual(event.pre_value, b'foo')
break
w1 = asyncio.ensure_future(watch_1())
w2 = asyncio.ensure_future(watch_2())
w3 = asyncio.ensure_future(watch_3())
w4 = asyncio.ensure_future(watch_4())
await asyncio.wait_for(asyncio.wait([f1, f2, f3, f4]), 2)
await self.client.put('/foo', 'foo')
await self.client.put('/foo', 'foo1')
await self.client.delete('/foo')
done, pending = await asyncio.wait([w1, w2, w3, w4], timeout=20)
for t in done:
t.result()
@asynctest
async def test_watch_1(self):
await self.common_watch1()
async def watch_reconnect(self):
f1 = asyncio.get_event_loop().create_future()
f2 = asyncio.get_event_loop().create_future()
async def watch_1():
i = 0
async with self.client.watch_scope('/foo') as response:
f1.set_result(None)
async for event in response:
i = i + 1
if i == 1:
self.assertEqual(event.type, EVENT_TYPE_CREATE)
self.assertEqual(event.key, b'/foo')
self.assertEqual(event.value, b'foo')
f2.set_result(None)
elif i == 2:
self.assertEqual(event.type, EVENT_TYPE_MODIFY)
self.assertEqual(event.key, b'/foo')
self.assertEqual(event.value, b'foo1')
elif i == 3:
self.assertEqual(event.type, EVENT_TYPE_DELETE)
self.assertEqual(event.key, b'/foo')
# delete event has no value
# self.assertEqual(event.value, b'foo1')
break
t1 = asyncio.ensure_future(watch_1())
await f1
await self.client.put('/foo', 'foo')
await f2
self.client.update_server_list(self.endpoints)
await self.client.put('/foo', 'foo1')
await self.client.delete('/foo')
await t1
@asynctest
async def test_watch_reconnect(self):
await self.watch_reconnect()
async def watch_create_cancel(self):
async def watch_1():
async with self.client.watch_scope('/foo') as _:
pass
async def watch_2():
async with self.client.watch_scope('/foo') as _:
await asyncio.sleep(5)
for _ in range(0, 5):
watches = [asyncio.ensure_future(watch_1() if i % 2 else watch_2()) for i in range(0, 200)]
await asyncio.sleep(1)
for w in watches[::3]:
w.cancel()
self.client.update_server_list(self.endpoints)
await asyncio.sleep(0.01)
for w in watches[1::3]:
w.cancel()
await asyncio.sleep(0.3)
for w in watches[2::3]:
w.cancel()
await asyncio.wait_for(asyncio.wait(watches), 3)
results = await asyncio.gather(*watches, return_exceptions=True)
print("Finished:", len([r for r in results if r is None]), "Cancelled:", len([r for r in results if r is not None]))
self.assertIsNotNone(self.client._watch_task_running)
await asyncio.sleep(3)
self.assertIsNone(self.client._watch_task_running)
@asynctest
async def test_watch_create_cancel(self):
await self.watch_create_cancel()
async def batch_events(self):
f1 = asyncio.get_event_loop().create_future()
f2 = asyncio.get_event_loop().create_future()
def _check_event(e, criterias):
if criterias[0]:
self.assertEqual(e.type, criterias[0])
if criterias[1]:
self.assertEqual(e.key, criterias[1])
if criterias[2]:
self.assertEqual(e.value, criterias[2])
async def watch_1():
asserts = [(EVENT_TYPE_CREATE, b'/foo/1', b'1'),
(EVENT_TYPE_CREATE, b'/foo/2', b'2'),
(EVENT_TYPE_MODIFY, b'/foo/1', b'2'),
(EVENT_TYPE_MODIFY, b'/foo/2', b'3'),
(EVENT_TYPE_DELETE, b'/foo/1', None),
(EVENT_TYPE_DELETE, b'/foo/2', None)]
async with self.client.watch_scope(range_prefix('/foo/')) as response:
f1.set_result(None)
async for e in response:
_check_event(e, asserts.pop(0))
if not asserts:
break
async def watch_2():
asserts = [((EVENT_TYPE_CREATE, b'/foo/1', b'1'),
(EVENT_TYPE_CREATE, b'/foo/2', b'2'),),
((EVENT_TYPE_MODIFY, b'/foo/1', b'2'),),
((EVENT_TYPE_MODIFY, b'/foo/2', b'3'),),
((EVENT_TYPE_DELETE, b'/foo/1', None),
(EVENT_TYPE_DELETE, b'/foo/2', None))]
async with self.client.watch_scope(range_prefix('/foo/'), batch_events=True) \
as response:
f2.set_result(None)
async for es in response:
batch = asserts.pop(0)
self.assertEqual(len(es), len(batch))
for e, a in zip(es, batch):
_check_event(e, a)
if not asserts:
break
t1 = asyncio.ensure_future(watch_1())
t2 = asyncio.ensure_future(watch_2())
await asyncio.wait_for(asyncio.wait([f1, f2]), 2)
self.assertTrue((await self.client.txn([], [self.client.put.txn('/foo/1', '1'),
self.client.put.txn('/foo/2', '2')], []))[0])
await self.client.put('/foo/1', '2')
await self.client.put('/foo/2', '3')
self.assertTrue((await self.client.txn([], [self.client.delete.txn('/foo/1'),
self.client.delete.txn('/foo/2')], []))[0])
await asyncio.gather(t1, t2)
@asynctest
async def test_batch_events(self):
await self.batch_events()
async def compact_revision(self):
await self.client.put('/foo', '1')
first_revision = self.client.last_response_info.revision
await self.client.put('/foo', '2')
await self.client.put('/foo', '3')
await self.client.put('/foo', '4')
await self.client.put('/foo', '5')
compact_revision = self.client.last_response_info.revision
await self.client.compact(compact_revision, True)
async def watch_1():
async with self.client.watch_scope('/foo', start_revision=first_revision) as response:
with self.assertRaises(CompactRevisonException) as cm:
async for e in response:
raise ValueError("Not raised")
self.assertEqual(cm.exception.revision, compact_revision)
async def watch_2():
async with self.client.watch_scope('/foo', ignore_compact=True, start_revision=first_revision) as responses:
async for e in responses:
self.assertEqual(e.type, EVENT_TYPE_MODIFY)
self.assertEqual(e.key, b'/foo')
self.assertEqual(e.value, b'5')
self.assertEqual(e.revision, compact_revision)
break
await watch_1()
await watch_2()
@asynctest
async def test_compact_revision(self):
await self.compact_revision()
async def watch_exception(self):
f1 = asyncio.get_event_loop().create_future()
f2 = asyncio.get_event_loop().create_future()
async def watch_1():
i = 0
async with self.client.watch_scope('/foo') as response:
f1.set_result(None)
with self.assertRaises(WatchException):
async for event in response:
i = i + 1
if i == 1:
self.assertEqual(event.type, EVENT_TYPE_CREATE)
self.assertEqual(event.key, b'/foo')
self.assertEqual(event.value, b'foo')
f2.set_result(None)
elif i == 2:
raise ValueError("Not raised")
f3 = asyncio.get_event_loop().create_future()
f4 = asyncio.get_event_loop().create_future()
async def watch_2():
i = 0
async with self.client.watch_scope('/foo', always_reconnect=True) as response:
f3.set_result(None)
async for event in response:
i = i + 1
if i == 1:
self.assertEqual(event.type, EVENT_TYPE_CREATE)
self.assertEqual(event.key, b'/foo')
self.assertEqual(event.value, b'foo')
f4.set_result(None)
elif i == 2:
self.assertEqual(event.type, EVENT_TYPE_MODIFY)
self.assertEqual(event.key, b'/foo')
self.assertEqual(event.value, b'foo1')
elif i == 3:
self.assertEqual(event.type, EVENT_TYPE_DELETE)
self.assertEqual(event.key, b'/foo')
# delete event has no value
# self.assertEqual(event.value, b'foo1')
break
t1 = asyncio.ensure_future(watch_1())
t2 = asyncio.ensure_future(watch_2())
await f1
await f3
await self.client.put('/foo', 'foo')
await f2
await f4
fake_endpoints = 'ipv4:///127.0.0.1:49999'
self.client.update_server_list(fake_endpoints)
await asyncio.sleep(2)
self.client.update_server_list(self.endpoints)
await self.client.put('/foo', 'foo1')
await self.client.delete('/foo')
await t1
await t2
@asynctest
async def test_watch_exception(self):
await self.watch_exception()
async def _run_test_with_auth(self, test):
default_client = self.client
await switch_auth_on(default_client)
root_client = client(endpoint=self.endpoints, username="root", password="root")
await root_client.role_grant_permission(name='client', key_range=range_prefix('/foo'), permission=PER_RW)
self.client = client(endpoint=self.endpoints, username="client", password="client")
try:
await test()
finally:
await switch_auth_off(
root_client,
default_client
)
await root_client.close()
await self.client.close()
self.client = default_client
@asynctest
async def test_watch1_with_auth(self):
await self._run_test_with_auth(self.common_watch1)
@asynctest
async def test_watch_reconnect_with_auth(self):
await self._run_test_with_auth(self.watch_reconnect)
@asynctest
async def test_watch_create_cancel_with_auth(self):
await self._run_test_with_auth(self.watch_create_cancel)
@asynctest
async def test_batch_events_with_auth(self):
await self._run_test_with_auth(self.batch_events)
@asynctest
async def test_compact_revision_with_auth(self):
await self._run_test_with_auth(self.compact_revision)
@asynctest
async def test_watch_exception_with_auth(self):
await self._run_test_with_auth(self.watch_exception)
@asynctest
async def tearDown(self):
await self.cleanUp()
await self.client.close()
async def cleanUp(self):
await self.client.delete(range_all())
if __name__ == '__main__':
unittest.main()