Skip to content

Commit 338a55d

Browse files
committed
Working RMG Adapter
1 parent b8a8a04 commit 338a55d

File tree

3 files changed

+126
-50
lines changed

3 files changed

+126
-50
lines changed

t3/main.py

Lines changed: 8 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -631,45 +631,17 @@ def run_rmg(self, restart_rmg: bool = False):
631631
walltime=self.t3['options']['max_RMG_walltime'],
632632
max_iterations=self.t3['options']['max_rmg_iterations'],
633633
verbose=self.verbose,
634-
t3_project_name=self.project
634+
t3_project_name=self.project,
635+
restart_rmg=restart_rmg,
635636
)
636637
rmg_adapter.run_rmg()
637638
if not os.path.isfile(self.paths['RMG input']):
638639
raise ValueError(f"The RMG input file {self.paths['RMG input']} could not be written.")
639640
tic = datetime.datetime.now()
640641

641642
max_rmg_exceptions_allowed = self.t3['options']['max_RMG_exceptions_allowed']
642-
643643

644-
645-
646-
647-
648-
#
649-
650-
write_rmg_input_file(
651-
rmg=self.rmg,
652-
t3=self.t3,
653-
iteration=self.iteration,
654-
path=self.paths['RMG input'],
655-
walltime=self.t3['options']['max_RMG_walltime'],
656-
)
657-
if not os.path.isfile(self.paths['RMG input']):
658-
raise ValueError(f"The RMG input file {self.paths['RMG input']} could not be written.")
659-
tic = datetime.datetime.now()
660-
661-
max_rmg_exceptions_allowed = self.t3['options']['max_RMG_exceptions_allowed']
662-
rmg_exception_encountered = rmg_runner(rmg_input_file_path=self.paths['RMG input'],
663-
job_log_path=self.paths['RMG job log'],
664-
logger=self.logger,
665-
memory=self.rmg['memory'] * 1000 if self.rmg['memory'] is not None else None,
666-
max_iterations=self.t3['options']['max_rmg_iterations'],
667-
verbose=self.verbose,
668-
t3_project_name=self.project,
669-
rmg_execution_type=self.rmg['rmg_execution_type'],
670-
restart_rmg=restart_rmg,
671-
)
672-
if rmg_exception_encountered:
644+
if rmg_adapter.rmg_exception_encountered:
673645
self.rmg_exceptions_counter += 1
674646
if self.rmg_exceptions_counter > max_rmg_exceptions_allowed:
675647
self.logger.error(f'This is the {get_number_with_ordinal_indicator(self.rmg_exceptions_counter)} '
@@ -681,10 +653,14 @@ def run_rmg(self, restart_rmg: bool = False):
681653
f'This is the {get_number_with_ordinal_indicator(self.rmg_exceptions_counter)} '
682654
f'exception raised by RMG.\n'
683655
f'The maximum number of exceptions allowed is {max_rmg_exceptions_allowed}.')
684-
685656
elapsed_time = time_lapse(tic)
686657
self.logger.info(f'RMG terminated, execution time: {elapsed_time}')
687658

659+
660+
661+
662+
663+
#
688664
def determine_species_and_reactions_to_calculate(self) -> bool:
689665
"""
690666
Determine which species and reactions in the executed RMG job should be calculated.

t3/runners/rmg_adapter.py

Lines changed: 109 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import math
44
import shutil
55
import datetime
6+
import time
67

78
from mako.template import Template
89
from t3.utils.writer import to_camel_case
@@ -18,6 +19,7 @@
1819
}
1920

2021
RMG_EXECUTION_TYPE = settings['execution_type']['rmg']
22+
MAX_RMG_RUNS_PER_ITERATION = 5 # TODO: Why is this hard-coded?
2123
submit_filenames = settings['submit_filenames']
2224
rmg_memory = settings['rmg_initial_memory']
2325
if RMG_EXECUTION_TYPE == 'queue':
@@ -70,37 +72,68 @@ def __init__(self,
7072
self.server = server or SERVER
7173
self.testing = testing
7274
self.logger = logger
73-
75+
self.previous_job_status = None
76+
self.time_running = 0
77+
self.restart_rmg = restart_rmg
7478

7579
if not os.path.isdir(local_t3_path):
7680
os.makedirs(local_t3_path)
7781

7882
self.files_to_upload = list()
7983
self.folder_to_download = None
80-
84+
self.rmg_errors = list()
85+
self.rmg_run_count = 0
86+
8187
def run_rmg(self):
8288
"""
8389
Run RMG
8490
"""
85-
self.set_cpu_and_mem()
86-
self.set_file_paths()
87-
self.set_files()
88-
8991
if self.rmg_execution_type == 'incore':
9092
self.execute_incore()
91-
elif self.rmg_execution_type == 'queue':
92-
self.execute_queue()
93-
# While the job is running, periodically check the status of the job
94-
while self.job_status == 'running':
95-
self.determine_rmg_job_status()
96-
# Once the job is done, download the results
97-
if self.job_status == 'done':
98-
self.download_files()
93+
elif self.rmg_execution_type == 'local' or self.rmg_execution_type == 'queue':
94+
while self.cont_run_rmg:
95+
self.set_cpu_and_mem()
96+
self.set_file_paths()
97+
self.set_files()
98+
99+
100+
if self.rmg_execution_type == 'queue':
101+
self.rmg_run_count += 1
102+
self.execute_queue()
103+
# While the job is running, periodically check the status of the job
104+
while self.job_status == 'running':
105+
# Wait for 5 minutes before checking again
106+
time.sleep(300)
107+
self.time_running += 300
108+
self.determine_rmg_job_status()
109+
# Log the status of the job
110+
# If we do it every 5 mins, the log file will be flooded with the same message
111+
# So only log if the status has changed or if 30 mins have passed
99112

113+
if self.job_status != self.previous_job_status or self.time_running % 1800 == 0:
114+
self.logger.info(f'RMG-{self.iteration}_iteration job status: {self.job_status}')
115+
self.previous_job_status = self.job_status
100116

117+
# Once the job is done, download the results
118+
if self.job_status == 'done':
119+
# Log that the job is done and will download the results
120+
self.logger.info(f'RMG-{self.iteration}_iteration job status: {self.job_status}, downloading results...')
121+
self.download_files()
122+
123+
# Need to check for convergence or errors
124+
self.check_convergance()
125+
# Get local err file path
126+
err_path = os.path.join(self.local_rmg_path, 'err.txt')
127+
if os.path.isfile(err_path):
128+
os.rename(err_path, os.path.join(self.local_rmg_path, f'err_{datetime.datetime.now().strftime("%b%d_%Y_%H:%M:%S")}.txt'))
129+
self.rmg_errors.append(self.error)
101130
else:
102131
raise ValueError(f'RMG execution type {self.rmg_execution_type} is not supported.')
103132

133+
# set self.rmg_exceptions the opposite of self.rmg_converged
134+
self.rmg_exception_encountered = not self.rmg_converged
135+
136+
104137
def write_rmg_input_file(self):
105138
"""
106139
Write an RMG input file to the given file path.
@@ -424,7 +457,6 @@ def set_cpu_and_mem(self):
424457
self.submit_script_memory = math.ceil(total_submit_script_memory) # in MB
425458
self.set_input_file_memory()
426459

427-
428460
def set_files(self) -> None:
429461
"""
430462
Set files to be uploaded and downloaded. Writes the files if needed.
@@ -444,17 +476,35 @@ def set_files(self) -> None:
444476
if self.rmg_execution_type != 'incore':
445477
# We need a submit script (submitted local or SSH)
446478
self.write_submit_script()
479+
447480
self.files_to_upload.append(self.get_file_property_dictionary(
448481
file_name=submit_filenames[CLUSTER_SOFT]))
449482
# 1.2. RMG input file
450483
self.write_rmg_input_file()
484+
# If this a restart, we need to upload the restart file
485+
if self.restart_rmg:
486+
restart_string = "restartFromSeed(path='seed')"
487+
with open(self.rmg_input_file_path, 'r') as f:
488+
content = f.read()
489+
seed_path = os.path.join(self.local_rmg_path, 'seed')
490+
if restart_string not in content and os.path.isdir(seed_path) and os.listdir(seed_path):
491+
if os.path.isfile(os.path.join(self.local_rmg_path, 'restart_from_seed.py')):
492+
os.rename(src=os.path.join(self.local_rmg_path, 'input.py'),
493+
dst=os.path.join(self.local_rmg_path, 'input.py.old'))
494+
os.rename(src=os.path.join(self.local_rmg_path, 'restart_from_seed.py'),
495+
dst=os.path.join(self.local_rmg_path, 'input.py'))
496+
elif os.path.isfile(os.path.join(self.local_rmg_path, 'input.py')):
497+
with open(os.path.join(self.local_rmg_path, 'input.py'), 'r') as f:
498+
content = f.read()
499+
with open(os.path.join(self.local_rmg_path, 'input.py'), 'w') as f:
500+
f.write(restart_string + '\n\n' + content)
501+
451502
self.files_to_upload.append(self.get_file_property_dictionary(file_name='input.py'))
452503

453504
# 2. ** Download **
454505
# 2.1. RMG output folder
455506
self.folder_to_download = self.remote_path
456507

457-
458508
def set_additional_file_paths(self) -> None:
459509
"""
460510
Set additional file paths to be uploaded and downloaded.
@@ -589,6 +639,7 @@ def upload_files(self) -> None:
589639
except shutil.SameFileError:
590640
pass
591641
self.initial_time = datetime.datetime.now()
642+
592643
def determine_rmg_job_status(self) -> None:
593644
"""
594645
Determine the RMG job status.
@@ -608,3 +659,45 @@ def download_files(self) -> None:
608659
# Also, even if the job is submitted to the queue, no need to download files if the server is local.
609660
with SSHClient(self.server) as ssh:
610661
ssh.download_folder(remote_folder_path=self.remote_path, local_folder_path=self.local_rmg_path)
662+
663+
def check_convergance(self) -> None:
664+
self.rmg_converged, self.error = False, None
665+
rmg_log_path = os.path.join(self.local_rmg_path, 'RMG.log')
666+
rmg_err_path = os.path.join(self.local_rmg_path, 'err.txt')
667+
if os.path.isfile(rmg_log_path):
668+
with open(rmg_log_path, 'r') as f:
669+
# Read the lines of the log file in reverse order
670+
lines = f.readlines()[::-1]
671+
for line in lines:
672+
if 'MODEL GENERATION COMPLETED' in line:
673+
self.rmg_converged = True
674+
break
675+
if not self.rmg_converged and os.path.isfile(rmg_err_path):
676+
with open(rmg_err_path, 'r') as f:
677+
lines = f.readlines()[::-1]
678+
for line in lines:
679+
if 'Error' in line:
680+
self.error = line.strip()
681+
break
682+
return self.rmg_converged, self.error
683+
684+
def convergance_failure(self) -> None:
685+
686+
if not self.rmg_converged:
687+
if self.error is not None:
688+
self.logger.error(f'RMG job {self.job_id} failed with error: {self.error}')
689+
# Check if memory was the failure - TODO: this is not working
690+
#new_memory = self.get_new_memory_rmg_run()
691+
692+
self.cont_run_rmg = not self.rmg_converged \
693+
and self.rmg_run_count < MAX_RMG_RUNS_PER_ITERATION \
694+
and not(len(self.rmg_errors)) >=2 and self.error is not None \
695+
and self.error == self.rmg_errors[-2]
696+
self.restart_rmg = False if self.error is not None and 'Could not find one or more of the required files/directories ' \
697+
'for restarting from a seed mechanism' in self.error else True
698+
699+
def get_new_memory_rmg_run(self) -> None:
700+
"""
701+
Get a new memory for the RMG job.
702+
"""
703+
pass

t3/utils/ssh.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import os
1212
import time
1313
from typing import Any, Callable, List, Optional, Tuple, Union
14+
import stat
1415

1516
import paramiko
1617

@@ -226,12 +227,18 @@ def download_folder(self,
226227
remote_filepath = remote_folder_path + '/' + filename
227228
local_filepath = os.path.join(local_folder_path, filename)
228229

230+
231+
229232
# Download files
230-
if item.isfile():
233+
# stat.S_ISREG(item.st_mode) is True if item is a file (not a folder)
234+
if stat.S_ISREG(item.st_mode):
231235
self._sftp.get(remote_filepath, local_filepath)
232236

233237
# Recursively download folders
234-
elif item.isdir():
238+
# stat.S_ISDIR(item.st_mode) is True if item is a folder
239+
elif stat.S_ISDIR(item.st_mode):
240+
# create the folder in the local path
241+
os.makedirs(local_filepath, exist_ok=True)
235242
self.download_folder(remote_filepath, local_filepath)
236243
except IOError:
237244
logger.warning(f'Got an IOError when trying to download file '

0 commit comments

Comments
 (0)