diff --git a/CH_13_async_io/exercise_01/__init__.py b/CH_13_async_io/exercise_01/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/CH_13_async_io/exercise_01/solution_00.py b/CH_13_async_io/exercise_01/solution_00.py new file mode 100644 index 0000000..2728814 --- /dev/null +++ b/CH_13_async_io/exercise_01/solution_00.py @@ -0,0 +1,76 @@ +# Try to create a `asyncio` base class that automatically +# registers all instances for easy closing/destructuring when you +# are done +import abc +import asyncio + + +class AsyncBase(abc.ABC): + _instances = [] + + def __init__(self): + self._instances.append(self) + + async def close(self): + raise NotImplementedError + + +class AsyncManager(AsyncBase): + # Use a separate class for the managing of the instances so + # we don't pollute the namespace of the base class + + @classmethod + async def close(cls): + # Make sure to clear the list of instances while closing + while cls._instances: + await cls._instances.pop().close() + + # Support `async with` syntax as well + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + await self.close() + + +class A(AsyncBase): + def __init__(self): + super().__init__() + print('A.__init__') + + async def close(self): + print('A.close') + + +class B(AsyncBase): + def __init__(self): + super().__init__() + print('B.__init__') + + async def close(self): + print('B.close') + + +async def main(): + print('Using close method directly') + + A() + B() + await AsyncManager.close() + + print() + + +async def main_with(): + print('Using async with') + + async with AsyncManager(): + A() + B() + + print() + + +if __name__ == '__main__': + asyncio.run(main()) + asyncio.run(main_with()) diff --git a/CH_13_async_io/exercise_02/__init__.py b/CH_13_async_io/exercise_02/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/CH_13_async_io/exercise_02/solution_00.py b/CH_13_async_io/exercise_02/solution_00.py new file mode 100644 index 0000000..fe28252 --- /dev/null +++ b/CH_13_async_io/exercise_02/solution_00.py @@ -0,0 +1,92 @@ +# Create an `asyncio` wrapper class for a synchronous process +# such as file or network operations using executors + +# This example shows an `AsyncioFile` class that makes your file +# operations asynchronous by running them in a separate thread. +# If your operation has a tendency to block the Python GIL you +# could also opt for using a ProcessPoolExecutor instead. +# +# Note that for real-life usage I would recommend the aiofiles +# module over this class. + +import asyncio +import concurrent.futures +import functools +import pathlib +from asyncio import AbstractEventLoop +from concurrent.futures import ThreadPoolExecutor + + +class AsyncExecutorBase: + _executor: ThreadPoolExecutor + _loop: AbstractEventLoop + + def __init__(self): + self._executor = concurrent.futures.ThreadPoolExecutor() + self._loop = asyncio.get_running_loop() + super().__init__() + + def _run_in_executor(self, func, *args, **kwargs): + # Note that this method is not async but can be awaited + # because it returns a coroutine. Alternatively, we could + # have made this method async and used `await` before + # returning + return self._loop.run_in_executor( + self._executor, + functools.partial(func, *args, **kwargs), + ) + + +class AsyncioFile(AsyncExecutorBase): + _path: pathlib.Path + + def __init__(self, path: pathlib.Path): + super().__init__() + self._path = path + + async def exists(self) -> bool: + return await self._run_in_executor(self._path.exists) + + async def rename(self, target): + return await self._run_in_executor( + self._path.rename, + target, + ) + + async def read_text(self, encoding=None, errors=None): + return await self._run_in_executor( + self._path.read_text, + encoding=encoding, + errors=errors, + ) + + async def read_bytes(self): + return await self._run_in_executor(self._path.read_bytes) + + async def write_text(self, data, encoding=None, errors=None, + newline=None): + return await self._run_in_executor( + self._path.write_text, + data, + encoding=encoding, + errors=errors, + newline=newline, + ) + + async def write_bytes(self, data): + return await self._run_in_executor( + self._path.write_bytes, + data, + ) + +async def main(): + afile = AsyncioFile(pathlib.Path(__file__)) + + print('#' * 79) + print('Exists:', await afile.exists()) + print('#' * 79) + print('Contents:') + print(await afile.read_text()) + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/CH_14_multithreading_and_multiprocessing/exercise_01/__init__.py b/CH_14_multithreading_and_multiprocessing/exercise_01/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/CH_14_multithreading_and_multiprocessing/exercise_01/solution_00.py b/CH_14_multithreading_and_multiprocessing/exercise_01/solution_00.py new file mode 100644 index 0000000..68a4448 --- /dev/null +++ b/CH_14_multithreading_and_multiprocessing/exercise_01/solution_00.py @@ -0,0 +1,40 @@ +# See if you can make an echo server and client as separate +# processes. Even though we did not cover +# `multiprocessing.Pipe()`, I trust you can work with it +# regardless. It can be created through +# `a, b = multiprocessing.Pipe()` and you can use it with +# `a.send()` or `b.send()` and `a.recv()` or `b.recv()`. +import multiprocessing + + +def echo_client(receive_pipe, send_pipe, message): + print('client sending', message) + send_pipe.send(message) + print('client received', receive_pipe.recv()) + + +def echo_server(receive_pipe, send_pipe): + while True: + message = receive_pipe.recv() + print('server received', message) + send_pipe.send(message) + + +if __name__ == '__main__': + a, b = multiprocessing.Pipe() + server = multiprocessing.Process( + target=echo_server, + args=(a, b), + ) + server.start() + + for i in range(5): + client = multiprocessing.Process( + target=echo_client, + args=(a, b, f'message {i}'), + ) + client.start() + client.join() + + server.terminate() + server.join() diff --git a/CH_14_multithreading_and_multiprocessing/exercise_02/__init__.py b/CH_14_multithreading_and_multiprocessing/exercise_02/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/CH_14_multithreading_and_multiprocessing/exercise_02/solution_00.py b/CH_14_multithreading_and_multiprocessing/exercise_02/solution_00.py new file mode 100644 index 0000000..2c0d954 --- /dev/null +++ b/CH_14_multithreading_and_multiprocessing/exercise_02/solution_00.py @@ -0,0 +1,74 @@ +# Read all files in a directory and sum the size of the files by +# reading each file using `concurrent.futures`. If you want an +# extra challenge, walk through the directories recursively by +# letting the thread/process queue new items while running. + +import concurrent.futures +import logging +import pathlib +import time + +# Our current directory +PATH = pathlib.Path(__file__).parent + + +def get_size(path: pathlib.Path) -> int: + size = path.stat().st_size + logging.info('%s is %d bytes', path, size) + return size + + +def get_total_size(path) -> int: + with concurrent.futures.ThreadPoolExecutor() as executor: + return sum(executor.map(get_size, path.iterdir())) + + +def get_size_or_queue( + executor: concurrent.futures.Executor, + futures: list[concurrent.futures.Future], + path: pathlib.Path, +) -> int: + # If the path is a directory, queue up the children + if path.is_dir(): + for child in path.iterdir(): + futures.append(executor.submit( + get_size_or_queue, executor, futures, child)) + + # A directory has size 0 but we recurse into it + return 0 + else: + return get_size(path) + + +def get_total_size_recursive(path) -> int: + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = [] + + # Note that we are using a regular list as a queue. This is + # thread-safe because `list.append()` is atomic. + futures.append(executor.submit( + get_size_or_queue, executor, futures, path)) + + total_size = 0 + for future in futures: + total_size += future.result() + + return total_size + + +def main(path: pathlib.Path): + total_size = get_total_size(path) + print(f'Total size for {path} is: {total_size}') + + # Sleep so editors such as Pycharm don't mix the output + time.sleep(0.5) + + total_size = get_total_size_recursive(path) + print(f'Recursive total size for {path} is: {total_size}') + + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO) + + # Use the parent directory to get a reasonable list of files + main(PATH.parent) diff --git a/CH_14_multithreading_and_multiprocessing/exercise_03/__init__.py b/CH_14_multithreading_and_multiprocessing/exercise_03/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/CH_14_multithreading_and_multiprocessing/exercise_03/multiprocessing_solution_00.py b/CH_14_multithreading_and_multiprocessing/exercise_03/multiprocessing_solution_00.py new file mode 100644 index 0000000..8ae64d8 --- /dev/null +++ b/CH_14_multithreading_and_multiprocessing/exercise_03/multiprocessing_solution_00.py @@ -0,0 +1,35 @@ +# Read all files in a directory and sum the size of the files by +# reading each file using `multiprocessing` + +import logging +import multiprocessing +import pathlib + +# Directory to process +PATH = pathlib.Path(__file__).parent.parent + +# We need to setup the logging outside of the +# `if __name__ == '__main__'` block because the +# `multiprocessing` module will not execute that section. +logging.basicConfig(level=logging.INFO) + + +def get_size(path: pathlib.Path): + size = path.stat().st_size + logging.info( + '%s is %d bytes', + path.relative_to(PATH), + size, + ) + return size + + +def main(path: pathlib.Path): + with multiprocessing.Pool() as pool: + total_size = sum(pool.map(get_size, path.iterdir())) + + print(f'Total size for {path} is: {total_size}') + + +if __name__ == '__main__': + main(PATH) diff --git a/CH_14_multithreading_and_multiprocessing/exercise_03/threading_solution_00.py b/CH_14_multithreading_and_multiprocessing/exercise_03/threading_solution_00.py new file mode 100644 index 0000000..c594211 --- /dev/null +++ b/CH_14_multithreading_and_multiprocessing/exercise_03/threading_solution_00.py @@ -0,0 +1,45 @@ +# Read all files in a directory and sum the size of the files by +# reading each file using `threading` + +import logging +import pathlib +import threading + +# Directory to process +PATH = pathlib.Path(__file__).parent.parent + + +class FileSizeThread(threading.Thread): + + def __init__(self, path: pathlib.Path): + super().__init__() + self.path = path + self.size = 0 + + def run(self): + self.size = self.path.stat().st_size + logging.info( + '%s is %d bytes', + self.path.relative_to(PATH), + self.size, + ) + + +def main(path: pathlib.Path): + threads = [] + for child in path.iterdir(): + thread = FileSizeThread(child) + thread.start() + threads.append(thread) + + total_size = 0 + for thread in threads: + thread.join() + total_size += thread.size + + print(f'Total size for {path} is: {total_size}') + + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO) + main(PATH) diff --git a/CH_14_multithreading_and_multiprocessing/exercise_04/__init__.py b/CH_14_multithreading_and_multiprocessing/exercise_04/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/CH_14_multithreading_and_multiprocessing/exercise_04/multiprocessing_solution_00.py b/CH_14_multithreading_and_multiprocessing/exercise_04/multiprocessing_solution_00.py new file mode 100644 index 0000000..54beef9 --- /dev/null +++ b/CH_14_multithreading_and_multiprocessing/exercise_04/multiprocessing_solution_00.py @@ -0,0 +1,83 @@ +# Read all files in a directory and sum the size of the files by reading each file using `processing` or `multiprocessing` + +import logging +import multiprocessing +import pathlib + +# Directory to process +PATH = pathlib.Path(__file__).parent.parent +WORKERS = 8 +POLL_INTERVAL = 0.25 + +# We need to setup the logging outside of the +# `if __name__ == '__main__'` block because the +# `multiprocessing` module will not execute that section. +logging.basicConfig(level=logging.INFO) + + +class FileSizeProcess(multiprocessing.Process): + size: multiprocessing.Value + queue: multiprocessing.Queue + + def __init__(self, size, queue): + super().__init__() + self.queue = queue + self.size = size + + def run(self): + while True: + path = self.queue.get() + + total_size = 0 + # Walk through the directory and sum the filesizes + # for files and queue up directories + child: pathlib.Path + for child in path.iterdir(): + if child.is_dir(): + self.queue.put(child) + else: + size = child.stat().st_size + total_size += size + logging.info( + '%s is %d bytes', + child.relative_to(PATH), + size, + ) + + # Update the size in the shared memory. Since this is a + # relatively slow operation we do it once per loop + self.size.value += total_size + + # The JoinableQueue requires us to tell it that we are + # done with the item + self.queue.task_done() + + +def main(path: pathlib.Path): + processs = [] + q = multiprocessing.JoinableQueue() + q.put(path) + + total_size = multiprocessing.Value('i', 0) + + # Create, start and store the worker processs + for _ in range(WORKERS): + process = FileSizeProcess(total_size, q) + process.start() + processs.append(process) + + # Wait until all the items in the queue have been processed + q.join() + q.close() + + # Terminate all the processs + for process in processs: + process.terminate() + process.join() + + # Wait for all processs to finish and sum their sizes + print(f'Total size for {path} is: {total_size.value}') + + +if __name__ == '__main__': + main(PATH) diff --git a/CH_14_multithreading_and_multiprocessing/exercise_04/threading_solution_00.py b/CH_14_multithreading_and_multiprocessing/exercise_04/threading_solution_00.py new file mode 100644 index 0000000..ac96c3b --- /dev/null +++ b/CH_14_multithreading_and_multiprocessing/exercise_04/threading_solution_00.py @@ -0,0 +1,85 @@ +# Read all files in a directory and sum the size of the files by +# reading each file using `threading` or `multiprocessing` +# +# As above, but walk through the directories recursively by +# letting the thread/process queue new items while running. + + +import logging +import pathlib +import queue +import threading + +# Directory to process +PATH = pathlib.Path(__file__).parent.parent +WORKERS = 8 +POLL_INTERVAL = 0.25 + + +class FileSizeThread(threading.Thread): + # Create a `stop` event so we can stop the thread externally + stop: threading.Event + size: int + queue: queue.Queue + + def __init__(self, queue): + super().__init__() + self.queue = queue + self.size = 0 + self.stop = threading.Event() + + def run(self): + while not self.stop.is_set(): + # Get the next item from the queue if available. If the + # queue is empty, wait for 0.25 second and try again + # unless we are told to stop. + try: + path = self.queue.get(timeout=POLL_INTERVAL) + except queue.Empty: + continue + + # Walk through the directory and sum the filesizes + # for files and queue up directories + for child in path.iterdir(): + self.process_path(child) + + def process_path(self, child): + if child.is_dir(): + self.queue.put(child) + else: + size = child.stat().st_size + self.size += size + logging.info( + '%s is %d bytes', + child.relative_to(PATH), + size, + ) + + +def main(path: pathlib.Path): + threads = [] + q = queue.Queue() + q.put(path) + + # Create, start and store the worker threads + for _ in range(WORKERS): + thread = FileSizeThread(q) + thread.start() + threads.append(thread) + + # Stop all threads + for thread in threads: + thread.stop.set() + + # Wait for all threads to finish and sum their sizes + total_size = 0 + for thread in threads: + thread.join() + total_size += thread.size + + print(f'Total size for {path} is: {total_size}') + + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO) + main(PATH) diff --git a/CH_14_multithreading_and_multiprocessing/exercise_05/__init__.py b/CH_14_multithreading_and_multiprocessing/exercise_05/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/CH_14_multithreading_and_multiprocessing/exercise_05/solution_00.py b/CH_14_multithreading_and_multiprocessing/exercise_05/solution_00.py new file mode 100644 index 0000000..7097521 --- /dev/null +++ b/CH_14_multithreading_and_multiprocessing/exercise_05/solution_00.py @@ -0,0 +1,5 @@ +# Create a pool of workers that keeps waiting for items to be +# queued through `multiprocessing.Queue()`. + +# Please refer to exercise_04/multiprocessing_solution_00.py for +# the solution to the exercise as it already uses this technique. diff --git a/CH_14_multithreading_and_multiprocessing/exercise_06/__init__.py b/CH_14_multithreading_and_multiprocessing/exercise_06/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/CH_14_multithreading_and_multiprocessing/exercise_06/solution_00.py b/CH_14_multithreading_and_multiprocessing/exercise_06/solution_00.py new file mode 100644 index 0000000..ebe5ee6 --- /dev/null +++ b/CH_14_multithreading_and_multiprocessing/exercise_06/solution_00.py @@ -0,0 +1,50 @@ +# Convert the pool above to a safe RPC (remote procedure call) +# type operation. +import multiprocessing + +WORKERS = 4 + + +def say(msg): + print(f'Saying: {msg}') + + +# Explicitly define the RPC methods to make this safer +RPC_METHODS = dict(say=say) + + +class RpcProcess(multiprocessing.Process): + def __init__(self, queue: multiprocessing.Queue): + super().__init__() + self.queue = queue + + def run(self): + while True: + func_name, args, kwargs = self.queue.get() + func = RPC_METHODS[func_name] + func(*args, **kwargs) + self.queue.task_done() + + +def main(): + q = multiprocessing.JoinableQueue() + q.put(('say', ('hello',), {})) + q.put(('say', ('world',), {})) + # This should result in an error because this is not a valid + # RPC method + q.put(('non-existing-method', (), {})) + + for _ in range(WORKERS): + p = RpcProcess(q) + p.start() + + q.join() + q.close() + + for p in multiprocessing.active_children(): + p.terminate() + p.join() + + +if __name__ == '__main__': + main() diff --git a/CH_14_multithreading_and_multiprocessing/exercise_07/__init__.py b/CH_14_multithreading_and_multiprocessing/exercise_07/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/CH_14_multithreading_and_multiprocessing/exercise_07/solution_00.py b/CH_14_multithreading_and_multiprocessing/exercise_07/solution_00.py new file mode 100644 index 0000000..3a25ba8 --- /dev/null +++ b/CH_14_multithreading_and_multiprocessing/exercise_07/solution_00.py @@ -0,0 +1,105 @@ +# Apply your functional programming skills and calculate +# something in a parallel way. Perhaps parallel sorting? +import multiprocessing +import random + +# Since sorting is CPU limited we should not go above the number of +# CPU cores +WORKERS = multiprocessing.cpu_count() + + +def merge_sort(data): + if len(data) <= 1: + return data + + middle = len(data) // 2 + left = merge_sort(data[:middle]) + right = merge_sort(data[middle:]) + return merge(left, right) + + +def merge(left, right): + ''' + Merge two sorted lists into one sorted list + + >>> merge([1, 3, 5], [2, 4, 6]) + [1, 2, 3, 4, 5, 6] + >>> merge([1, 3, 5], [2, 4, 6, 7]) + [1, 2, 3, 4, 5, 6, 7] + >>> merge([1, 2, 3], [1, 2, 3]) + [1, 1, 2, 2, 3, 3] + ''' + result = [] + left_index = right_index = 0 + + # When using iterators, we can avoid the IndexError of + # accessing a non-existing element by using the `next` + # function. This will raise a `StopIteration` error if + # there are no more elements to iterate over. + left_next = left[left_index] + right_next = right[right_index] + + while True: + try: + if left_next <= right_next: + result.append(left_next) + left_index += 1 + left_next = left[left_index] + else: + result.append(right_next) + right_index += 1 + right_next = right[right_index] + except IndexError: + # If we get an IndexError, it means that we have + # reached the end of one of the lists. We can + # simply extend the result with the remaining + # elements and break out of the loop. + result.extend(left[left_index:] or right[right_index:]) + break + + return result + + +def split(data, size=WORKERS): + ''' + Split a list into `size` different chunks so that each chunk + can be processed in parallel. + ''' + chunk_size = len(data) // size + return [data[i:i + chunk_size] for i in + range(0, len(data), chunk_size)] + + +def multiprocessing_merge_sort(data): + # Split the data into chunks + with multiprocessing.Pool(processes=WORKERS) as pool: + chunks = split(data, WORKERS) + sorted_chunks = pool.map(merge_sort, chunks) + + # Merge the chunks + i = 0 + while len(sorted_chunks) > 1: + # zip the chunks into pairs + pairs = zip(sorted_chunks[::2], sorted_chunks[1::2]) + # merge the pairs in parallel + merged_chunks = pool.starmap(merge, pairs) + + # If we have an odd number of chunks, we need to + # add the last chunk to the merged chunks + if len(sorted_chunks) % 2 == 1: + merged_chunks.append(sorted_chunks[-1]) + + sorted_chunks = merged_chunks + + return sorted_chunks[0] + + +def main(): + data = random.sample(range(1000), 100) + sorted_data = multiprocessing_merge_sort(data) + # Verify that the data is sorted correctly + assert sorted_data == sorted(data) + + +if __name__ == '__main__': + main()