Skip to content

Commit e91849f

Browse files
Mission completed
1 parent c825116 commit e91849f

File tree

3 files changed

+519
-0
lines changed

3 files changed

+519
-0
lines changed

client.py

Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
import socket
2+
import threading
3+
import logging
4+
import protocol
5+
import sys
6+
import shlex
7+
import time
8+
import os
9+
10+
# logging.basicConfig(level=logging.INFO, format='%(asctime)s | %(message)s')
11+
logging.basicConfig(level=logging.INFO, format='%(message)s')
12+
13+
TARGET_SERVER_IP = '127.0.0.1'
14+
TARGET_SERVER_PORT = 9999
15+
16+
if len(sys.argv) != 2:
17+
print("Usage: python client.py <my_p2p_port>")
18+
sys.exit(1)
19+
20+
try:
21+
MY_2P2_PORT = int(sys.argv[1])
22+
except ValueError:
23+
print("Invalid port number. Please provide a valid integer.")
24+
sys.exit(1)
25+
26+
MY_HOSTNAME = socket.gethostname()
27+
28+
stop_event = threading.Event() # Sự kiện để dừng luồng lắng nghe P2P
29+
30+
# Bắt đầu luồng lắng nghe kết nối P2P
31+
def start_p2p_listener(p2p_port):
32+
p2p_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
33+
p2p_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
34+
try:
35+
p2p_socket.bind(('', p2p_port))
36+
p2p_socket.listen(5)
37+
logging.info(f"P2P listener started on port {p2p_port}")
38+
39+
p2p_socket.settimeout(1.0) # Check stop_event 1 giây một lần
40+
41+
while not stop_event.is_set():
42+
try:
43+
peer_connection, peer_address = p2p_socket.accept()
44+
logging.info(f"Accepted connection from {peer_address}")
45+
peer_handler = threading.Thread(target=handle_peer, args=(peer_connection, peer_address))
46+
peer_handler.daemon = True
47+
peer_handler.start()
48+
except socket.timeout:
49+
continue
50+
except Exception as e:
51+
if stop_event.is_set():
52+
break
53+
logging.error(f"P2P listener error: {e}")
54+
except Exception as e:
55+
logging.error(f"P2P listener error: {e}")
56+
finally:
57+
p2p_socket.close()
58+
59+
def handle_peer(peer_socket, peer_address):
60+
thread_name = threading.current_thread().name
61+
logging.info(f"[{thread_name}] Handling peer {peer_address}")
62+
try:
63+
message = protocol.receive_message(peer_socket) # Chờ nhận yêu cầu xin file từ peer
64+
if message and message.get('action') == 'get_file':
65+
lname = message.get('lname') # Xử lý yêu cầu xin file từ peer
66+
logging.info(f"[{thread_name}] Peer {peer_address} requested file {lname}")
67+
if not lname or not os.path.exists(lname):
68+
logging.warning(f"File {lname} does not exist.")
69+
else:
70+
# Gửi file cho peer
71+
logging.info(f"[{thread_name}] Start sending file {lname} to {peer_address}")
72+
with open(lname, 'rb') as file:
73+
while True:
74+
chunk = file.read(4096)
75+
if not chunk:
76+
break
77+
peer_socket.sendall(chunk)
78+
logging.info(f"[{thread_name}] Finished sending file {lname} to {peer_address}")
79+
else:
80+
logging.warning(f"[{thread_name}] Invalid request from peer {peer_address}")
81+
except Exception as e:
82+
logging.error(f"[{thread_name}] Error handling peer {peer_address}: {e}")
83+
finally:
84+
peer_socket.close()
85+
logging.info(f"[{thread_name}] Closed connection with peer {peer_address}")
86+
87+
def do_publish(socket, lname, fname):
88+
if not os.path.exists(lname):
89+
logging.error(f"File {lname} does not exist.")
90+
return
91+
publish_message = {'action': 'publish', 'lname': lname, 'fname': fname}
92+
if protocol.send_message(socket, publish_message):
93+
response = protocol.receive_message(socket)
94+
logging.info(f"Publish response: {response}")
95+
else:
96+
logging.error("Failed to send publish message.")
97+
98+
def download_request_to_peer(chosen_peer, fname_to_save):
99+
logging.info("Starting download from peer...")
100+
peer_ip = chosen_peer['ip']
101+
peer_port = chosen_peer['port']
102+
lname_on_peer = chosen_peer['lname']
103+
104+
logging.info(f"Connecting to peer at IP: {peer_ip}, Port: {peer_port}...")
105+
106+
p2p_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
107+
p2p_socket.settimeout(10) # Thiết lập timeout kết nối
108+
109+
try:
110+
p2p_socket.connect((peer_ip, peer_port))
111+
logging.info("Connected to peer.")
112+
request_message = {'action': 'get_file', 'lname': lname_on_peer}
113+
protocol.send_message(p2p_socket, request_message)
114+
logging.info("Request sent to peer, starting to receive file...")
115+
116+
bytes_downloaded = 0
117+
with open(fname_to_save, 'wb') as file:
118+
while True:
119+
chunk = p2p_socket.recv(4096)
120+
if not chunk:
121+
break
122+
file.write(chunk)
123+
bytes_downloaded += len(chunk)
124+
logging.info(f"Download completed. Total bytes downloaded: {bytes_downloaded} bytes.")
125+
except socket.timeout:
126+
logging.error(f"Error: Over 10s, Peer {peer_ip}:{peer_port} did not respond.")
127+
except Exception as e:
128+
logging.error(f"Error downloading file from peer: {e}")
129+
finally:
130+
p2p_socket.close()
131+
logging.info("--- End of P2P download ---")
132+
133+
134+
def do_fetch(socket, fname):
135+
fetch_message = {'action': 'fetch', 'fname': fname}
136+
if protocol.send_message(socket, fetch_message):
137+
response = protocol.receive_message(socket)
138+
139+
if response and response.get('status') == 'success':
140+
peer_list = response.get('peer_list', [])
141+
if peer_list:
142+
logging.info(f"File {fname} is available from the following peer(s):")
143+
for i, peer in enumerate(peer_list):
144+
logging.info(f" [{i+1}] Hostname: {peer['hostname']}, IP: {peer['ip']}, Port: {peer['port']}")
145+
146+
chosen_index = 0
147+
if len(peer_list) > 1:
148+
try:
149+
choice_str = input(f"Enter 1 number from 1 to {len(peer_list)} to choose a peer (default = 1): ")
150+
if not choice_str:
151+
chosen_int = 1
152+
else:
153+
chosen_int = int(choice_str)
154+
155+
if 1 <= chosen_int <= len(peer_list):
156+
chosen_index = chosen_int - 1
157+
else:
158+
logging.warning("Invalid choice, defaulting to 1.")
159+
chosen_index = 0
160+
except ValueError:
161+
logging.warning("Invalid input, defaulting to 1.")
162+
chosen_index = 0
163+
chosen_peer = peer_list[chosen_index]
164+
logging.info(f"Decided to download from peer: Hostname: {chosen_peer['hostname']}, IP: {chosen_peer['ip']}, Port: {chosen_peer['port']}")
165+
166+
if os.path.exists(fname):
167+
overwrite = input(f"File '{fname}' already exists. Overwrite? (y/n): ").lower()
168+
if overwrite != 'y':
169+
logging.info("Download cancelled by user.")
170+
return
171+
download_request_to_peer(chosen_peer, fname)
172+
else:
173+
logging.error(f"Fetch failed or no response received: {response}")
174+
else:
175+
logging.error("Failed to send fetch message.")
176+
177+
178+
def run_client(p2p_port):
179+
threading.current_thread().name = "Main Thread"
180+
logging.info(f"Client starting with P2P port: {p2p_port}")
181+
182+
p2p_thread = threading.Thread(target=start_p2p_listener, args=(p2p_port,))
183+
p2p_thread.daemon = True
184+
p2p_thread.start()
185+
186+
time.sleep(1) # Đợi một chút để luồng lắng nghe P2P khởi động
187+
188+
client_to_server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
189+
190+
try:
191+
logging.info(f"Connecting to server at IP: {TARGET_SERVER_IP} - Port:{TARGET_SERVER_PORT}...")
192+
client_to_server_socket.connect((TARGET_SERVER_IP, TARGET_SERVER_PORT))
193+
logging.info("Connection established.")
194+
195+
intro_message = {'action': 'hello', 'hostname': MY_HOSTNAME, 'p2p_port': p2p_port}
196+
protocol.send_message(client_to_server_socket, intro_message)
197+
198+
response = protocol.receive_message(client_to_server_socket)
199+
if response:
200+
logging.info(f"Received response from server: {response}")
201+
else:
202+
logging.warning("No response received from server.")
203+
204+
while True:
205+
cmd_line = input(f"Enter publish <lname> <fname>/ fetch <fname>/ exit: ")
206+
if not cmd_line:
207+
continue
208+
try:
209+
parts = shlex.split(cmd_line)
210+
action = parts[0].lower()
211+
if action == 'publish' and len(parts) == 3:
212+
lname = parts[1]
213+
fname = parts[2]
214+
# logging.info(f"Publishing file: {fname} with logical name: {lname}")
215+
do_publish(client_to_server_socket, lname, fname)
216+
217+
elif action == 'fetch' and len(parts) == 2:
218+
fname = parts[1]
219+
# logging.info(f"Fetching file: {fname}")
220+
do_fetch(client_to_server_socket, fname)
221+
222+
elif action == 'exit':
223+
logging.info("Exiting client.")
224+
break
225+
else:
226+
logging.warning(f"Invalid command: {cmd_line}")
227+
except Exception as e:
228+
logging.error(f"Error processing command: {e}")
229+
230+
except socket.error as e:
231+
logging.error(f"Connection failed: {e}")
232+
except KeyboardInterrupt:
233+
logging.info("Client interrupted.")
234+
except Exception as e:
235+
logging.error(f"An error occurred: {e}")
236+
finally:
237+
stop_event.set() # Yêu cầu dừng luồng lắng nghe P2P
238+
client_to_server_socket.close()
239+
p2p_thread.join()
240+
logging.info("Connection closed.")
241+
242+
if __name__ == "__main__":
243+
run_client(MY_2P2_PORT)

protocol.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import socket
2+
import struct
3+
import json
4+
import logging
5+
6+
HEADER_LENGTH = 4 # Kích thước header để lưu độ dài dữ liệu
7+
8+
def send_message(sock, message_dict):
9+
try:
10+
message_bytes = json.dumps(message_dict).encode('utf-8')
11+
header_bytes = struct.pack('!I', len(message_bytes)) # Đóng gói độ dài dữ liệu thành 4 byte
12+
sock.sendall(header_bytes + message_bytes)
13+
return True
14+
except Exception as e:
15+
print(f"Error sending message: {e}")
16+
return False
17+
18+
def receive_message(sock):
19+
try:
20+
# Đọc header để lấy độ dài dữ liệu
21+
header_bytes = sock.recv(HEADER_LENGTH)
22+
if not header_bytes:
23+
# logging.warning("No header received")
24+
return None
25+
message_length = struct.unpack('!I', header_bytes)[0]
26+
27+
# Đọc dữ liệu dựa trên độ dài đã nhận
28+
message_bytes_list = []
29+
bytes_received = 0
30+
while bytes_received < message_length:
31+
chunk = sock.recv(min(message_length - bytes_received, 4096))
32+
if not chunk:
33+
# logging.warning("Connection closed before receiving full message")
34+
return None
35+
message_bytes_list.append(chunk)
36+
bytes_received += len(chunk)
37+
38+
message_bytes = b''.join(message_bytes_list)
39+
message_dict = json.loads(message_bytes.decode('utf-8'))
40+
return message_dict
41+
42+
except Exception as e:
43+
logging.error(f"Error receiving message: {e}")
44+
return None

0 commit comments

Comments
 (0)