edgedb-challenge/port-opener.py

106 lines
3.2 KiB
Python
Executable File

#!/usr/bin/env python3
import argparse
import asyncio
import logging
import random
import socket
import signal
import functools
async def main():
parser = argparse.ArgumentParser()
parser.add_argument('--port', '-p', type=int, default=8500, help="port to listen and make connections on")
parser.add_argument('--num-connections', '-n', type=int, default=100, help="number of connections to open")
parser.add_argument('--ipv', type=int, default=4, choices=[4, 6], help="IP version (4 or 6) to use")
parser.add_argument('--verbose', '-v', action='store_true', help="enable verbose logging")
args = parser.parse_args()
level = logging.DEBUG if args.verbose else logging.INFO
logging.basicConfig(format="%(message)s", level=level)
if args.ipv == 4:
host = "127.0.0.1"
family = socket.AF_INET
elif args.ipv == 6:
host = "::1"
family = socket.AF_INET6
else:
raise ValueError(args.ipv)
loop = asyncio.get_running_loop()
tasks = []
started_event = asyncio.Event()
server_task = asyncio.create_task(start_server(loop, host, args.port, family, started_event))
tasks.append(server_task)
await started_event.wait()
logging.info(f"started server on {host}:{args.port}")
for index in range(args.num_connections):
client_task = asyncio.create_task(start_client(loop, index, host, args.port, family))
tasks.append(client_task)
logging.info(f"opened {args.num_connections} client connections")
try:
await asyncio.gather(*tasks)
except asyncio.exceptions.CancelledError:
logging.info("exiting")
return
async def start_server(loop, host, port, family, started_event):
server = await loop.create_server(
lambda: ServerProtocol(),
host=host, port=port, family=family)
started_event.set()
async with server:
await server.serve_forever()
class ServerProtocol(asyncio.Protocol):
def connection_made(self, transport):
self.transport = transport
peer = transport.get_extra_info("peername")
logging.debug(f"server received connection: {peer}")
def data_received(self, data):
message = data.decode()
logging.debug(f"server received message: {message}")
self.transport.write(b"pong")
async def start_client(loop, client_idx, host, port, family):
transport, protocol = await loop.create_connection(
lambda: ClientProtocol(client_idx),
host=host, port=port, family=family)
message = f"ping {client_idx}".encode()
try:
while True:
transport.write(message)
delay = 10 + (random.random() * 10)
await asyncio.sleep(delay)
finally:
transport.close()
class ClientProtocol(asyncio.Protocol):
def __init__(self, client_idx):
self.client_idx = client_idx
def connection_made(self, transport):
peer = transport.get_extra_info("peername")
logging.debug(f"client {self.client_idx} made connection: {peer}")
def connection_lost(self, exc):
logging.debug(f"client {self.client_idx} closed connection: {exc}")
if __name__ == "__main__":
asyncio.run(main())