|
| 1 | +import socket |
| 2 | +import threading |
| 3 | +import logging |
| 4 | +import protocol |
| 5 | +import sys |
| 6 | +import shlex |
| 7 | +import time |
| 8 | +import os |
| 9 | +from datetime import datetime |
| 10 | + |
| 11 | +# logging.basicConfig(level=logging.INFO, format='%(asctime)s | %(message)s') |
| 12 | +logging.basicConfig(level=logging.INFO, format='%(message)s') |
| 13 | + |
| 14 | +class Client: |
| 15 | + def __init__(self, server_ip, server_port, p2p_port, hostname=None): |
| 16 | + self.server_ip = server_ip |
| 17 | + self.server_port = server_port |
| 18 | + self.p2p_port = p2p_port |
| 19 | + self.hostname = hostname or socket.gethostname() |
| 20 | + self.stop_event = threading.Event() # Sự kiện để dừng luồng lắng nghe P2P |
| 21 | + self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| 22 | + |
| 23 | + # Bắt đầu luồng lắng nghe kết nối P2P |
| 24 | + def _start_p2p_listener(self): |
| 25 | + p2p_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| 26 | + p2p_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| 27 | + try: |
| 28 | + p2p_socket.bind(('', self.p2p_port)) |
| 29 | + p2p_socket.listen(5) |
| 30 | + logging.debug(f"P2P listener started on port {self.p2p_port}") |
| 31 | + |
| 32 | + p2p_socket.settimeout(1.0) # Check stop_event 1 giây một lần |
| 33 | + |
| 34 | + while not self.stop_event.is_set(): |
| 35 | + try: |
| 36 | + peer_connection, peer_address = p2p_socket.accept() |
| 37 | + logging.info(f"Accepted connection from {peer_address}") |
| 38 | + peer_handler = threading.Thread(target=self._handle_peer, args=(peer_connection, peer_address)) |
| 39 | + peer_handler.daemon = True |
| 40 | + peer_handler.start() |
| 41 | + except socket.timeout: |
| 42 | + continue |
| 43 | + except Exception as e: |
| 44 | + if self.stop_event.is_set(): |
| 45 | + break |
| 46 | + logging.error(f"P2P listener error: {e}") |
| 47 | + except Exception as e: |
| 48 | + logging.error(f"P2P listener error: {e}") |
| 49 | + finally: |
| 50 | + p2p_socket.close() |
| 51 | + |
| 52 | + def _handle_peer(self, peer_socket, peer_address): |
| 53 | + thread_name = threading.current_thread().name |
| 54 | + logging.info(f"[{thread_name}] Handling peer {peer_address}") |
| 55 | + try: |
| 56 | + message = protocol.receive_message(peer_socket) # Chờ nhận yêu cầu xin file từ peer |
| 57 | + if message and message.get('action') == 'get_file': |
| 58 | + lname = message.get('lname') # Xử lý yêu cầu xin file từ peer |
| 59 | + logging.info(f"[{thread_name}] Peer {peer_address} requested file {lname}") |
| 60 | + if not lname or not os.path.exists(lname): |
| 61 | + logging.warning(f"File {lname} does not exist.") |
| 62 | + else: |
| 63 | + # Gửi file cho peer |
| 64 | + logging.info(f"[{thread_name}] Start sending file {lname} to {peer_address}") |
| 65 | + with open(lname, 'rb') as file: |
| 66 | + while True: |
| 67 | + chunk = file.read(4096) |
| 68 | + if not chunk: |
| 69 | + break |
| 70 | + peer_socket.sendall(chunk) |
| 71 | + logging.info(f"[{thread_name}] Finished sending file {lname} to {peer_address}") |
| 72 | + else: |
| 73 | + logging.warning(f"[{thread_name}] Invalid request from peer {peer_address}") |
| 74 | + except Exception as e: |
| 75 | + logging.error(f"[{thread_name}] Error handling peer {peer_address}: {e}") |
| 76 | + finally: |
| 77 | + peer_socket.close() |
| 78 | + logging.info(f"[{thread_name}] Closed connection with peer {peer_address}") |
| 79 | + |
| 80 | + def _do_publish(self, lname, fname): |
| 81 | + if not os.path.exists(lname): |
| 82 | + logging.error(f"File {lname} does not exist.") |
| 83 | + return |
| 84 | + file_size = os.path.getsize(lname) |
| 85 | + last_modified = datetime.utcfromtimestamp(os.path.getmtime(lname)).isoformat() + "Z" |
| 86 | + source_ext = os.path.splitext(lname)[1] |
| 87 | + target_ext = os.path.splitext(fname)[1] |
| 88 | + if source_ext and source_ext != target_ext: |
| 89 | + fname = os.path.splitext(fname)[0] + source_ext |
| 90 | + publish_message = { |
| 91 | + 'action': 'publish', |
| 92 | + 'lname': lname, |
| 93 | + 'fname': fname, |
| 94 | + 'file_size': file_size, |
| 95 | + 'last_modified': last_modified, |
| 96 | + } |
| 97 | + if protocol.send_message(self.server_socket, publish_message): |
| 98 | + response = protocol.receive_message(self.server_socket) |
| 99 | + logging.info(f"Publish response: {response}") |
| 100 | + else: |
| 101 | + logging.error("Failed to send publish message.") |
| 102 | + |
| 103 | + def _download_from_peer(self, chosen_peer, fname_to_save): |
| 104 | + logging.info("Starting download from peer...") |
| 105 | + peer_ip = chosen_peer['ip'] |
| 106 | + peer_port = chosen_peer['port'] |
| 107 | + lname_on_peer = chosen_peer['lname'] |
| 108 | + |
| 109 | + logging.info(f"Connecting to peer at IP: {peer_ip}, Port: {peer_port}...") |
| 110 | + p2p_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| 111 | + p2p_socket.settimeout(10) # Thiết lập timeout kết nối |
| 112 | + |
| 113 | + try: |
| 114 | + p2p_socket.connect((peer_ip, peer_port)) |
| 115 | + logging.info("Connected to peer.") |
| 116 | + request_message = {'action': 'get_file', 'lname': lname_on_peer} |
| 117 | + protocol.send_message(p2p_socket, request_message) |
| 118 | + bytes_downloaded = 0 |
| 119 | + with open(fname_to_save, 'wb') as file: |
| 120 | + while True: |
| 121 | + chunk = p2p_socket.recv(4096) |
| 122 | + if not chunk: |
| 123 | + break |
| 124 | + file.write(chunk) |
| 125 | + bytes_downloaded += len(chunk) |
| 126 | + except socket.timeout: |
| 127 | + logging.error(f"Error: Over 10s, Peer {peer_ip}:{peer_port} did not respond.") |
| 128 | + except Exception as e: |
| 129 | + logging.error(f"Error downloading file from peer: {e}") |
| 130 | + finally: |
| 131 | + p2p_socket.close() |
| 132 | + |
| 133 | + def _do_fetch(self, fname): |
| 134 | + fetch_message = {'action': 'fetch', 'fname': fname} |
| 135 | + if not protocol.send_message(self.server_socket, fetch_message): |
| 136 | + logging.error("Failed to send fetch message.") |
| 137 | + return |
| 138 | + |
| 139 | + response = protocol.receive_message(self.server_socket) |
| 140 | + if not response or response.get('status') != 'success': |
| 141 | + logging.error(f"Fetch failed or no response received: {response}") |
| 142 | + return |
| 143 | + |
| 144 | + peer_list = response.get('peer_list', []) |
| 145 | + if not peer_list: |
| 146 | + logging.info(f"File '{fname}' not found on any peer.") |
| 147 | + return |
| 148 | + |
| 149 | + logging.info(f"File {fname} is available from the following peer(s):") |
| 150 | + for i, peer in enumerate(peer_list): |
| 151 | + logging.info(f" [{i+1}] Hostname: {peer['hostname']}, IP: {peer['ip']}, Port: {peer['port']}") |
| 152 | + |
| 153 | + chosen_index = 0 |
| 154 | + if len(peer_list) > 1: |
| 155 | + try: |
| 156 | + choice_str = input(f"Enter 1 number from 1 to {len(peer_list)} to choose a peer (default = 1): ") |
| 157 | + chosen_int = int(choice_str) if choice_str else 1 |
| 158 | + if 1 <= chosen_int <= len(peer_list): |
| 159 | + chosen_index = chosen_int - 1 |
| 160 | + else: |
| 161 | + logging.warning("Invalid choice, defaulting to 1.") |
| 162 | + except ValueError: |
| 163 | + logging.warning("Invalid input, defaulting to 1.") |
| 164 | + |
| 165 | + chosen_peer = peer_list[chosen_index] |
| 166 | + logging.info(f"Decided to download from peer: Hostname: {chosen_peer['hostname']}, IP: {chosen_peer['ip']}, Port: {chosen_peer['port']}") |
| 167 | + |
| 168 | + if os.path.exists(fname): |
| 169 | + overwrite = input(f"File '{fname}' already exists. Overwrite? (y/n): ").lower() |
| 170 | + if overwrite != 'y': |
| 171 | + logging.info("Download cancelled by user.") |
| 172 | + return |
| 173 | + self._download_from_peer(chosen_peer, fname) |
| 174 | + |
| 175 | + def run(self): |
| 176 | + threading.current_thread().name = "Main Thread" |
| 177 | + logging.debug(f"Client starting with P2P port: {self.p2p_port}") |
| 178 | + |
| 179 | + p2p_thread = threading.Thread(target=self._start_p2p_listener, name="P2PListenerThread") |
| 180 | + p2p_thread.daemon = True |
| 181 | + p2p_thread.start() |
| 182 | + |
| 183 | + time.sleep(1) # Đợi một chút để luồng lắng nghe P2P khởi động |
| 184 | + |
| 185 | + try: |
| 186 | + logging.debug(f"Connecting to server at IP: {self.server_ip} - Port:{self.server_port}...") |
| 187 | + self.server_socket.connect((self.server_ip, self.server_port)) |
| 188 | + logging.info(f"Connected to server {self.server_ip}:{self.server_port}.") |
| 189 | + |
| 190 | + intro_message = {'action': 'hello', 'hostname': self.hostname, 'p2p_port': self.p2p_port} |
| 191 | + protocol.send_message(self.server_socket, intro_message) |
| 192 | + |
| 193 | + response = protocol.receive_message(self.server_socket) |
| 194 | + logging.info(f"Received response from server: {response}" if response else "No response from server.") |
| 195 | + |
| 196 | + while True: |
| 197 | + cmd_line = input(f"Enter publish <lname> <fname>/ fetch <fname>/ exit: ") |
| 198 | + if not cmd_line: |
| 199 | + continue |
| 200 | + try: |
| 201 | + parts = shlex.split(cmd_line) |
| 202 | + action = parts[0].lower() |
| 203 | + if action == 'publish' and len(parts) == 3: |
| 204 | + lname = parts[1] |
| 205 | + fname = parts[2] |
| 206 | + # logging.info(f"Publishing file: {fname} with logical name: {lname}") |
| 207 | + self._do_publish(lname, fname) |
| 208 | + elif action == 'fetch' and len(parts) == 2: |
| 209 | + fname = parts[1] |
| 210 | + # logging.info(f"Fetching file: {fname}") |
| 211 | + self._do_fetch(fname) |
| 212 | + elif action == 'exit': |
| 213 | + logging.info("Exiting client.") |
| 214 | + break |
| 215 | + else: |
| 216 | + logging.warning(f"Invalid command: {cmd_line}") |
| 217 | + except Exception as e: |
| 218 | + logging.error(f"Error processing command: {e}") |
| 219 | + |
| 220 | + except socket.error as e: |
| 221 | + logging.error(f"Connection failed: {e}") |
| 222 | + except (KeyboardInterrupt, EOFError): |
| 223 | + logging.info("Client interrupted.") |
| 224 | + except Exception as e: |
| 225 | + logging.error(f"An error occurred: {e}") |
| 226 | + finally: |
| 227 | + self.stop_event.set() # Yêu cầu dừng luồng lắng nghe P2P |
| 228 | + self.server_socket.close() |
| 229 | + p2p_thread.join() |
| 230 | + logging.info("Connection closed.") |
| 231 | + |
| 232 | + |
| 233 | +def _run_cli_client(p2p_port, hostname=None): |
| 234 | + client_instance = Client(server_ip='10.156.12.139', server_port=9999, p2p_port=p2p_port, hostname=hostname) |
| 235 | + client_instance.run() |
| 236 | + |
| 237 | +if __name__ == "__main__": |
| 238 | + if not (2 <= len(sys.argv) <= 3): |
| 239 | + print("Usage: python client.py <my_p2p_port> [client_name]") |
| 240 | + sys.exit(1) |
| 241 | + |
| 242 | + try: |
| 243 | + p2p_port = int(sys.argv[1]) |
| 244 | + except ValueError: |
| 245 | + print("Invalid port number. Please provide a valid integer.") |
| 246 | + sys.exit(1) |
| 247 | + |
| 248 | + client_name = sys.argv[2] if len(sys.argv) == 3 else None |
| 249 | + |
| 250 | + sys.modules.setdefault("client", sys.modules[__name__]) |
| 251 | + try: |
| 252 | + from client_ui import main as client_ui_main |
| 253 | + except Exception as exc: |
| 254 | + logging.error(f"Unable to launch client UI: {exc}") |
| 255 | + logging.info("Falling back to CLI mode.") |
| 256 | + _run_cli_client(p2p_port, client_name) |
| 257 | + else: |
| 258 | + client_ui_main( |
| 259 | + default_server_ip='localhost', |
| 260 | + default_server_port=9999, |
| 261 | + default_p2p_port=p2p_port, |
| 262 | + default_client_name=client_name, |
| 263 | + auto_connect=True, |
| 264 | + ) |
| 265 | + sys.exit(0) |
0 commit comments