Added example answers for chapters 13 and 14 to fix #1

This commit is contained in:
Rick van Hattem 2022-08-31 20:49:02 +02:00
parent 1c106d776c
commit 82cf71ed1c
No known key found for this signature in database
GPG Key ID: E81444E9CE1F695D
20 changed files with 690 additions and 0 deletions

View File

View File

@ -0,0 +1,76 @@
# Try to create a `asyncio` base class that automatically
# registers all instances for easy closing/destructuring when you
# are done
import abc
import asyncio
class AsyncBase(abc.ABC):
_instances = []
def __init__(self):
self._instances.append(self)
async def close(self):
raise NotImplementedError
class AsyncManager(AsyncBase):
# Use a separate class for the managing of the instances so
# we don't pollute the namespace of the base class
@classmethod
async def close(cls):
# Make sure to clear the list of instances while closing
while cls._instances:
await cls._instances.pop().close()
# Support `async with` syntax as well
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
await self.close()
class A(AsyncBase):
def __init__(self):
super().__init__()
print('A.__init__')
async def close(self):
print('A.close')
class B(AsyncBase):
def __init__(self):
super().__init__()
print('B.__init__')
async def close(self):
print('B.close')
async def main():
print('Using close method directly')
A()
B()
await AsyncManager.close()
print()
async def main_with():
print('Using async with')
async with AsyncManager():
A()
B()
print()
if __name__ == '__main__':
asyncio.run(main())
asyncio.run(main_with())

View File

View File

@ -0,0 +1,92 @@
# Create an `asyncio` wrapper class for a synchronous process
# such as file or network operations using executors
# This example shows an `AsyncioFile` class that makes your file
# operations asynchronous by running them in a separate thread.
# If your operation has a tendency to block the Python GIL you
# could also opt for using a ProcessPoolExecutor instead.
#
# Note that for real-life usage I would recommend the aiofiles
# module over this class.
import asyncio
import concurrent.futures
import functools
import pathlib
from asyncio import AbstractEventLoop
from concurrent.futures import ThreadPoolExecutor
class AsyncExecutorBase:
_executor: ThreadPoolExecutor
_loop: AbstractEventLoop
def __init__(self):
self._executor = concurrent.futures.ThreadPoolExecutor()
self._loop = asyncio.get_running_loop()
super().__init__()
def _run_in_executor(self, func, *args, **kwargs):
# Note that this method is not async but can be awaited
# because it returns a coroutine. Alternatively, we could
# have made this method async and used `await` before
# returning
return self._loop.run_in_executor(
self._executor,
functools.partial(func, *args, **kwargs),
)
class AsyncioFile(AsyncExecutorBase):
_path: pathlib.Path
def __init__(self, path: pathlib.Path):
super().__init__()
self._path = path
async def exists(self) -> bool:
return await self._run_in_executor(self._path.exists)
async def rename(self, target):
return await self._run_in_executor(
self._path.rename,
target,
)
async def read_text(self, encoding=None, errors=None):
return await self._run_in_executor(
self._path.read_text,
encoding=encoding,
errors=errors,
)
async def read_bytes(self):
return await self._run_in_executor(self._path.read_bytes)
async def write_text(self, data, encoding=None, errors=None,
newline=None):
return await self._run_in_executor(
self._path.write_text,
data,
encoding=encoding,
errors=errors,
newline=newline,
)
async def write_bytes(self, data):
return await self._run_in_executor(
self._path.write_bytes,
data,
)
async def main():
afile = AsyncioFile(pathlib.Path(__file__))
print('#' * 79)
print('Exists:', await afile.exists())
print('#' * 79)
print('Contents:')
print(await afile.read_text())
if __name__ == '__main__':
asyncio.run(main())

View File

@ -0,0 +1,40 @@
# See if you can make an echo server and client as separate
# processes. Even though we did not cover
# `multiprocessing.Pipe()`, I trust you can work with it
# regardless. It can be created through
# `a, b = multiprocessing.Pipe()` and you can use it with
# `a.send()` or `b.send()` and `a.recv()` or `b.recv()`.
import multiprocessing
def echo_client(receive_pipe, send_pipe, message):
print('client sending', message)
send_pipe.send(message)
print('client received', receive_pipe.recv())
def echo_server(receive_pipe, send_pipe):
while True:
message = receive_pipe.recv()
print('server received', message)
send_pipe.send(message)
if __name__ == '__main__':
a, b = multiprocessing.Pipe()
server = multiprocessing.Process(
target=echo_server,
args=(a, b),
)
server.start()
for i in range(5):
client = multiprocessing.Process(
target=echo_client,
args=(a, b, f'message {i}'),
)
client.start()
client.join()
server.terminate()
server.join()

View File

@ -0,0 +1,74 @@
# Read all files in a directory and sum the size of the files by
# reading each file using `concurrent.futures`. If you want an
# extra challenge, walk through the directories recursively by
# letting the thread/process queue new items while running.
import concurrent.futures
import logging
import pathlib
import time
# Our current directory
PATH = pathlib.Path(__file__).parent
def get_size(path: pathlib.Path) -> int:
size = path.stat().st_size
logging.info('%s is %d bytes', path, size)
return size
def get_total_size(path) -> int:
with concurrent.futures.ThreadPoolExecutor() as executor:
return sum(executor.map(get_size, path.iterdir()))
def get_size_or_queue(
executor: concurrent.futures.Executor,
futures: list[concurrent.futures.Future],
path: pathlib.Path,
) -> int:
# If the path is a directory, queue up the children
if path.is_dir():
for child in path.iterdir():
futures.append(executor.submit(
get_size_or_queue, executor, futures, child))
# A directory has size 0 but we recurse into it
return 0
else:
return get_size(path)
def get_total_size_recursive(path) -> int:
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
# Note that we are using a regular list as a queue. This is
# thread-safe because `list.append()` is atomic.
futures.append(executor.submit(
get_size_or_queue, executor, futures, path))
total_size = 0
for future in futures:
total_size += future.result()
return total_size
def main(path: pathlib.Path):
total_size = get_total_size(path)
print(f'Total size for {path} is: {total_size}')
# Sleep so editors such as Pycharm don't mix the output
time.sleep(0.5)
total_size = get_total_size_recursive(path)
print(f'Recursive total size for {path} is: {total_size}')
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
# Use the parent directory to get a reasonable list of files
main(PATH.parent)

View File

@ -0,0 +1,35 @@
# Read all files in a directory and sum the size of the files by
# reading each file using `multiprocessing`
import logging
import multiprocessing
import pathlib
# Directory to process
PATH = pathlib.Path(__file__).parent.parent
# We need to setup the logging outside of the
# `if __name__ == '__main__'` block because the
# `multiprocessing` module will not execute that section.
logging.basicConfig(level=logging.INFO)
def get_size(path: pathlib.Path):
size = path.stat().st_size
logging.info(
'%s is %d bytes',
path.relative_to(PATH),
size,
)
return size
def main(path: pathlib.Path):
with multiprocessing.Pool() as pool:
total_size = sum(pool.map(get_size, path.iterdir()))
print(f'Total size for {path} is: {total_size}')
if __name__ == '__main__':
main(PATH)

View File

@ -0,0 +1,45 @@
# Read all files in a directory and sum the size of the files by
# reading each file using `threading`
import logging
import pathlib
import threading
# Directory to process
PATH = pathlib.Path(__file__).parent.parent
class FileSizeThread(threading.Thread):
def __init__(self, path: pathlib.Path):
super().__init__()
self.path = path
self.size = 0
def run(self):
self.size = self.path.stat().st_size
logging.info(
'%s is %d bytes',
self.path.relative_to(PATH),
self.size,
)
def main(path: pathlib.Path):
threads = []
for child in path.iterdir():
thread = FileSizeThread(child)
thread.start()
threads.append(thread)
total_size = 0
for thread in threads:
thread.join()
total_size += thread.size
print(f'Total size for {path} is: {total_size}')
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
main(PATH)

View File

@ -0,0 +1,83 @@
# Read all files in a directory and sum the size of the files by reading each file using `processing` or `multiprocessing`
import logging
import multiprocessing
import pathlib
# Directory to process
PATH = pathlib.Path(__file__).parent.parent
WORKERS = 8
POLL_INTERVAL = 0.25
# We need to setup the logging outside of the
# `if __name__ == '__main__'` block because the
# `multiprocessing` module will not execute that section.
logging.basicConfig(level=logging.INFO)
class FileSizeProcess(multiprocessing.Process):
size: multiprocessing.Value
queue: multiprocessing.Queue
def __init__(self, size, queue):
super().__init__()
self.queue = queue
self.size = size
def run(self):
while True:
path = self.queue.get()
total_size = 0
# Walk through the directory and sum the filesizes
# for files and queue up directories
child: pathlib.Path
for child in path.iterdir():
if child.is_dir():
self.queue.put(child)
else:
size = child.stat().st_size
total_size += size
logging.info(
'%s is %d bytes',
child.relative_to(PATH),
size,
)
# Update the size in the shared memory. Since this is a
# relatively slow operation we do it once per loop
self.size.value += total_size
# The JoinableQueue requires us to tell it that we are
# done with the item
self.queue.task_done()
def main(path: pathlib.Path):
processs = []
q = multiprocessing.JoinableQueue()
q.put(path)
total_size = multiprocessing.Value('i', 0)
# Create, start and store the worker processs
for _ in range(WORKERS):
process = FileSizeProcess(total_size, q)
process.start()
processs.append(process)
# Wait until all the items in the queue have been processed
q.join()
q.close()
# Terminate all the processs
for process in processs:
process.terminate()
process.join()
# Wait for all processs to finish and sum their sizes
print(f'Total size for {path} is: {total_size.value}')
if __name__ == '__main__':
main(PATH)

View File

@ -0,0 +1,85 @@
# Read all files in a directory and sum the size of the files by
# reading each file using `threading` or `multiprocessing`
#
# As above, but walk through the directories recursively by
# letting the thread/process queue new items while running.
import logging
import pathlib
import queue
import threading
# Directory to process
PATH = pathlib.Path(__file__).parent.parent
WORKERS = 8
POLL_INTERVAL = 0.25
class FileSizeThread(threading.Thread):
# Create a `stop` event so we can stop the thread externally
stop: threading.Event
size: int
queue: queue.Queue
def __init__(self, queue):
super().__init__()
self.queue = queue
self.size = 0
self.stop = threading.Event()
def run(self):
while not self.stop.is_set():
# Get the next item from the queue if available. If the
# queue is empty, wait for 0.25 second and try again
# unless we are told to stop.
try:
path = self.queue.get(timeout=POLL_INTERVAL)
except queue.Empty:
continue
# Walk through the directory and sum the filesizes
# for files and queue up directories
for child in path.iterdir():
self.process_path(child)
def process_path(self, child):
if child.is_dir():
self.queue.put(child)
else:
size = child.stat().st_size
self.size += size
logging.info(
'%s is %d bytes',
child.relative_to(PATH),
size,
)
def main(path: pathlib.Path):
threads = []
q = queue.Queue()
q.put(path)
# Create, start and store the worker threads
for _ in range(WORKERS):
thread = FileSizeThread(q)
thread.start()
threads.append(thread)
# Stop all threads
for thread in threads:
thread.stop.set()
# Wait for all threads to finish and sum their sizes
total_size = 0
for thread in threads:
thread.join()
total_size += thread.size
print(f'Total size for {path} is: {total_size}')
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
main(PATH)

View File

@ -0,0 +1,5 @@
# Create a pool of workers that keeps waiting for items to be
# queued through `multiprocessing.Queue()`.
# Please refer to exercise_04/multiprocessing_solution_00.py for
# the solution to the exercise as it already uses this technique.

View File

@ -0,0 +1,50 @@
# Convert the pool above to a safe RPC (remote procedure call)
# type operation.
import multiprocessing
WORKERS = 4
def say(msg):
print(f'Saying: {msg}')
# Explicitly define the RPC methods to make this safer
RPC_METHODS = dict(say=say)
class RpcProcess(multiprocessing.Process):
def __init__(self, queue: multiprocessing.Queue):
super().__init__()
self.queue = queue
def run(self):
while True:
func_name, args, kwargs = self.queue.get()
func = RPC_METHODS[func_name]
func(*args, **kwargs)
self.queue.task_done()
def main():
q = multiprocessing.JoinableQueue()
q.put(('say', ('hello',), {}))
q.put(('say', ('world',), {}))
# This should result in an error because this is not a valid
# RPC method
q.put(('non-existing-method', (), {}))
for _ in range(WORKERS):
p = RpcProcess(q)
p.start()
q.join()
q.close()
for p in multiprocessing.active_children():
p.terminate()
p.join()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,105 @@
# Apply your functional programming skills and calculate
# something in a parallel way. Perhaps parallel sorting?
import multiprocessing
import random
# Since sorting is CPU limited we should not go above the number of
# CPU cores
WORKERS = multiprocessing.cpu_count()
def merge_sort(data):
if len(data) <= 1:
return data
middle = len(data) // 2
left = merge_sort(data[:middle])
right = merge_sort(data[middle:])
return merge(left, right)
def merge(left, right):
'''
Merge two sorted lists into one sorted list
>>> merge([1, 3, 5], [2, 4, 6])
[1, 2, 3, 4, 5, 6]
>>> merge([1, 3, 5], [2, 4, 6, 7])
[1, 2, 3, 4, 5, 6, 7]
>>> merge([1, 2, 3], [1, 2, 3])
[1, 1, 2, 2, 3, 3]
'''
result = []
left_index = right_index = 0
# When using iterators, we can avoid the IndexError of
# accessing a non-existing element by using the `next`
# function. This will raise a `StopIteration` error if
# there are no more elements to iterate over.
left_next = left[left_index]
right_next = right[right_index]
while True:
try:
if left_next <= right_next:
result.append(left_next)
left_index += 1
left_next = left[left_index]
else:
result.append(right_next)
right_index += 1
right_next = right[right_index]
except IndexError:
# If we get an IndexError, it means that we have
# reached the end of one of the lists. We can
# simply extend the result with the remaining
# elements and break out of the loop.
result.extend(left[left_index:] or right[right_index:])
break
return result
def split(data, size=WORKERS):
'''
Split a list into `size` different chunks so that each chunk
can be processed in parallel.
'''
chunk_size = len(data) // size
return [data[i:i + chunk_size] for i in
range(0, len(data), chunk_size)]
def multiprocessing_merge_sort(data):
# Split the data into chunks
with multiprocessing.Pool(processes=WORKERS) as pool:
chunks = split(data, WORKERS)
sorted_chunks = pool.map(merge_sort, chunks)
# Merge the chunks
i = 0
while len(sorted_chunks) > 1:
# zip the chunks into pairs
pairs = zip(sorted_chunks[::2], sorted_chunks[1::2])
# merge the pairs in parallel
merged_chunks = pool.starmap(merge, pairs)
# If we have an odd number of chunks, we need to
# add the last chunk to the merged chunks
if len(sorted_chunks) % 2 == 1:
merged_chunks.append(sorted_chunks[-1])
sorted_chunks = merged_chunks
return sorted_chunks[0]
def main():
data = random.sample(range(1000), 100)
sorted_data = multiprocessing_merge_sort(data)
# Verify that the data is sorted correctly
assert sorted_data == sorted(data)
if __name__ == '__main__':
main()