From e314d433e66a6177c8a17e3fca5acc59005e1b06 Mon Sep 17 00:00:00 2001 From: Haidong Ji Date: Tue, 29 Jan 2019 16:18:10 -0600 Subject: Network Packet done! It's not too bad, since I worked things out in Java first. 2 things are of note: 1. Java Deque's pop is Python Deque's popleft. 2. I didn't use an iterator to remove items. It seemed to work fine. --- sources/network_packet.py | 63 +++++++++++++++++++++++++++++++++++++++++++++ tests/network_packetTest.py | 52 +++++++++++++++++++++++++++++++++++++ 2 files changed, 115 insertions(+) create mode 100644 sources/network_packet.py create mode 100644 tests/network_packetTest.py diff --git a/sources/network_packet.py b/sources/network_packet.py new file mode 100644 index 0000000..46ac2fb --- /dev/null +++ b/sources/network_packet.py @@ -0,0 +1,63 @@ +# python3 + +from collections import namedtuple, deque + +Request = namedtuple("Request", ["arrived_at", "time_to_process"]) +Response = namedtuple("Response", ["was_dropped", "started_at"]) + + +class Buffer: + def __init__(self, capacity): + self.capacity = capacity + self.finish_time = deque() + self.running_time_counter = 0 + + def process(self, request): + if self.running_time_counter < request.arrived_at: + if len(self.finish_time) == 0: + self.running_time_counter = request.arrived_at + elif self.finish_time[-1] < request.arrived_at: + self.running_time_counter = request.arrived_at + + while len(self.finish_time) > 0: + j = self.finish_time[0] + if j <= request.arrived_at: + self.finish_time.popleft() + else: + break + + if len(self.finish_time) >= self.capacity: + return Response(True, -1) + + response = Response(False, self.running_time_counter) + self.running_time_counter = self.running_time_counter + request.time_to_process + if len(self.finish_time) > 0: + self.finish_time.append(self.finish_time[-1] + request.time_to_process) + else: + self.finish_time.append(self.running_time_counter) + return response + + +def process_requests(requests, buffer): + responses = [] + for request in requests: + responses.append(buffer.process(request)) + return responses + + +def main(): + buffer_size, n_requests = map(int, input().split()) + requests = [] + for _ in range(n_requests): + arrived_at, time_to_process = map(int, input().split()) + requests.append(Request(arrived_at, time_to_process)) + + buffer = Buffer(buffer_size) + responses = process_requests(requests, buffer) + + for response in responses: + print(response.started_at if not response.was_dropped else -1) + + +if __name__ == "__main__": + main() diff --git a/tests/network_packetTest.py b/tests/network_packetTest.py new file mode 100644 index 0000000..c5bb709 --- /dev/null +++ b/tests/network_packetTest.py @@ -0,0 +1,52 @@ +import unittest + +from collections import namedtuple + +Request = namedtuple("Request", ["arrived_at", "time_to_process"]) +Response = namedtuple("Response", ["was_dropped", "started_at"]) + +import sources.network_packet +from sources.network_packet import process_requests + + +class MyTestCase(unittest.TestCase): + def test(self): + buffer_max_size = 1 + buf = sources.network_packet.Buffer(buffer_max_size) + requests = [] + responses = process_requests(requests, buf) + self.assertEqual(0, len(responses)) + + def test1(self): + buffer_max_size = 1 + buf = sources.network_packet.Buffer(buffer_max_size) + requests = [Request(0, 1)] + responses = process_requests(requests, buf) + self.assertEqual(1, len(responses)) + self.assertFalse(responses[0].was_dropped) + self.assertEqual(0, responses[0].started_at) + + def test2(self): + buffer_max_size = 1 + buf = sources.network_packet.Buffer(buffer_max_size) + requests = [Request(0, 1), Request(0, 1)] + responses = process_requests(requests, buf) + self.assertEqual(2, len(responses)) + self.assertFalse(responses[0].was_dropped) + self.assertEqual(0, responses[0].started_at) + self.assertTrue(responses[1].was_dropped) + + def test3(self): + buffer_max_size = 1 + buf = sources.network_packet.Buffer(buffer_max_size) + requests = [Request(0, 1), Request(1, 1)] + responses = process_requests(requests, buf) + self.assertEqual(2, len(responses)) + self.assertFalse(responses[0].was_dropped) + self.assertEqual(0, responses[0].started_at) + self.assertFalse(responses[1].was_dropped) + self.assertEqual(1, responses[1].started_at) + + +if __name__ == '__main__': + unittest.main() -- cgit v1.2.3