diff --git a/DIRECTORY.md b/DIRECTORY.md index 4e99d164..9fc52691 100644 --- a/DIRECTORY.md +++ b/DIRECTORY.md @@ -588,6 +588,8 @@ * [Visitor](https://github.com/BrianLusina/PythonSnips/blob/master/design_patterns/behavioral/visitor/visitor.py) * Browser History * [Test Browser History](https://github.com/BrianLusina/PythonSnips/blob/master/design_patterns/browser_history/test_browser_history.py) + * Continuous Median + * [Test Continuous Median Handler](https://github.com/BrianLusina/PythonSnips/blob/master/design_patterns/continuous_median/test_continuous_median_handler.py) * Linked List * [Test Linked List](https://github.com/BrianLusina/PythonSnips/blob/master/design_patterns/linked_list/test_linked_list.py) * Oop diff --git a/design_patterns/continuous_median/__init__.py b/design_patterns/continuous_median/__init__.py new file mode 100644 index 00000000..98dd1130 --- /dev/null +++ b/design_patterns/continuous_median/__init__.py @@ -0,0 +1,28 @@ +from typing import Optional +import heapq + + +class ContinuousMedianHandler: + def __init__(self): + self.median: Optional[int | float] = None + # Max heap keeps track of the lower half of numbers + self.max_heap = [] + # Min Heap keeps track of the upper half of numbers + self.min_heap = [] + + def insert(self, number: int) -> None: + heapq.heappush(self.min_heap, -heapq.heappushpop(self.max_heap, -number)) + + if len(self.min_heap) - len(self.max_heap) > 1: + heapq.heappush(self.max_heap, -heapq.heappop(self.min_heap)) + + if len(self.max_heap) == len(self.min_heap): + max_heap_root = self.max_heap[0] + min_heap_root = self.min_heap[0] + median = (min_heap_root - max_heap_root) / 2 + self.median = median + else: + self.median = self.min_heap[0] + + def get_median(self) -> Optional[int | float]: + return self.median diff --git a/design_patterns/continuous_median/test_continuous_median_handler.py b/design_patterns/continuous_median/test_continuous_median_handler.py new file mode 100644 index 00000000..3a474a5f --- /dev/null +++ b/design_patterns/continuous_median/test_continuous_median_handler.py @@ -0,0 +1,30 @@ +import unittest +from typing import List, Tuple +from parameterized import parameterized +from design_patterns.continuous_median import ContinuousMedianHandler + +CONTINUOUS_MEDIA_HANDLER_TESTS = [ + ( + ([(1, 2), 1.50000], [(3), 2.00000]), + ), +] + + +class ContinuousMedianHandlerTestCase(unittest.TestCase): + @parameterized.expand(CONTINUOUS_MEDIA_HANDLER_TESTS) + def test_continuous_median_handler(self, requests: List[Tuple[List[Tuple[int]] | int, int | float]]): + median_handler = ContinuousMedianHandler() + for request in requests: + data, expected = request + if type(data) is int: + median_handler.insert(data) + else: + for d in data: + median_handler.insert(d) + + actual = median_handler.get_median() + self.assertEqual(expected, actual) + + +if __name__ == "__main__": + unittest.main()