Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions swmm-toolkit/src/swmm/toolkit/output.i
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,20 @@ and return a (possibly) different pointer */

%apply int *OUTPUT {
int *version,
int *time
int *time,
int *year,
int *month,
int *day,
int *hour,
int *minute,
int *second,
int *dayOfWeek
}

%cstring_output_allocate_size(char **elementName, int *size, SMO_freeMemory(*$1));


/* TYPEMAPS FOR MEMORY MANAGEMNET OF FLOAT ARRAYS */
/* TYPEMAPS FOR MEMORY MANAGEMENT OF FLOAT ARRAYS */
%typemap(in, numinputs=0)float **float_out (float *temp), int *int_dim (int temp){
$1 = &temp;
}
Expand All @@ -84,6 +91,23 @@ and return a (possibly) different pointer */
}


/* TYPEMAPS FOR MEMORY MANAGEMENT OF DOUBLE ARRAYS */
%typemap(in, numinputs=0)double **double_out (double *temp), int *int_dim (int temp){
$1 = &temp;
}
%typemap(argout) (double **double_out, int *int_dim) {
if (*$1) {
PyObject *o = PyList_New(*$2);
double* temp = *$1;
for(int i=0; i<*$2; i++) {
PyList_SetItem(o, i, PyFloat_FromDouble((double)temp[i]));
}
$result = SWIG_AppendOutput($result, o);
SMO_freeMemory(*$1);
}
}


/* TYPEMAPS FOR MEMORY MANAGEMENT OF INT ARRAYS */
%typemap(in, numinputs=0)int **int_out (int *temp), int *int_dim (int temp){
$1 = &temp;
Expand Down Expand Up @@ -151,6 +175,8 @@ and return a (possibly) different pointer */
%ignore SMO_clearError;
%ignore SMO_checkError;

%noexception SMO_decodeDate;

%include "swmm_output.h"

%exception;
4 changes: 4 additions & 0 deletions swmm-toolkit/src/swmm/toolkit/output_rename.i
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
%rename(get_times) SMO_getTimes;
%rename(get_elem_name) SMO_getElementName;

%rename(get_date_time) SMO_getDateTime;
%rename(get_date_series) SMO_getDateSeries;
%rename(decode_date) SMO_decodeDate;

%rename(get_subcatch_series) SMO_getSubcatchSeries;
%rename(get_node_series) SMO_getNodeSeries;
%rename(get_link_series) SMO_getLinkSeries;
Expand Down
2 changes: 1 addition & 1 deletion swmm-toolkit/swmm-solver
59 changes: 59 additions & 0 deletions swmm-toolkit/tests/test_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,65 @@ def test_getelementname(handle):
assert output.get_elem_name(handle, shared_enum.ElementType.NODE, 1) == "10"



def test_getdatetime(handle):
date0 = output.get_date_time(handle, 0)
date1 = output.get_date_time(handle, 1)
assert isinstance(date0, (float, np.floating))

step_seconds = output.get_times(handle, shared_enum.Time.REPORT_STEP)
step_days = step_seconds / 86400.0

# consecutive timestamps differ by exactly one report step (in days)
assert np.isclose(date1 - date0, step_days)

# first timestamp should be strictly after the saved start date anchor
assert date0 > output.get_start_date(handle)


def test_getdateseries(handle):
start, end = 0, 5
dates = output.get_date_series(handle, start, end)

assert len(dates) == end - start + 1

step_days = output.get_times(handle, shared_enum.Time.REPORT_STEP) / 86400.0
diffs = np.diff(dates)

# monotonic and evenly spaced by report step
assert np.allclose(diffs, step_days)
assert np.isclose(dates[-1], dates[0] + (end - start) * step_days)


def test_decodedate(handle):
# decoded components are plausible
date0 = output.get_date_time(handle, 0)
y, m, d, hh, mm, ss, dow = output.decode_date(date0)

assert 1 <= m <= 12
assert 1 <= d <= 31
assert 0 <= hh <= 23
assert 0 <= mm <= 59
assert 0 <= ss <= 59
assert 1 <= dow <= 7

# consecutive decode respects the report step
date1 = output.get_date_time(handle, 1)
y1, m1, d1, hh1, mm1, ss1, dow1 = output.decode_date(date1)

step_seconds = output.get_times(handle, shared_enum.Time.REPORT_STEP)
step_hours = (step_seconds // 3600) % 24

# minutes/seconds remain constant for steps divisible by 60s
if step_seconds % 60 == 0:
assert mm1 == mm
assert ss1 == ss

# hour advances by step_hours modulo 24 (day rollover allowed)
assert ((hh1 - hh) % 24) == step_hours



def test_getsubcatchseries(handle):

ref_array = np.array([0.0,
Expand Down
160 changes: 160 additions & 0 deletions swmm-toolkit/tests/test_series.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
from datetime import datetime, timedelta
import os
import pytest
from swmm.toolkit import solver, output, shared_enum

DATA_PATH = os.path.join(os.path.abspath(os.path.dirname(__file__)), "data")
INPUT_FILE = os.path.join(DATA_PATH, "test_Example1.inp")
REPORT_FILE = os.path.join(DATA_PATH, "temp_align.rpt")
OUTPUT_FILE = os.path.join(DATA_PATH, "temp_align.out")

REPORT_STEP_SECONDS = 3600 # Example1


def _correct_decimal_digits(t, r):
"""
Correct Decimal Digits (CDD) is computed as a bounded form of
``-log10(abs(test_value - ref_value))``, which approximates how many
decimal digits of ``test_value`` agree with ``ref_value``.
"""
import math

if t == r:
return 10.0

tmp = abs(t - r)
if tmp < 1.0e-7:
tmp = 1.0e-7
elif tmp > 2.0:
tmp = 1.0

tmp = -math.log10(tmp)
if tmp < 0.0:
tmp = 0.0

return tmp


def check_cdd_float(test: list[float], ref: list[float], cdd_tol: int) -> bool:
"""
Check the minimum number of correct decimal digits (CDD) between two
float sequences. This function finds the minimum CDD over all element
pairs in ``test`` and ``ref``, then checks whether ``floor(min_cdd)``
is greater than or equal to ``cdd_tol``.

Parameters
----------
test : list[float]
Sequence of test values to be compared.
ref : list[float]
Sequence of reference values used as the expected results.
cdd_tol : int
Required minimum number of correct decimal digits (integer threshold)
that the minimum CDD over all pairs must meet or exceed.

Returns
-------
bool
``True`` if ``test`` and ``ref`` have the same length and
``floor(min_cdd) >= cdd_tol``; ``False`` otherwise (including if the
sequences differ in length).
"""
import math

if len(test) != len(ref):
return False

min_cdd = 10.0

for t, r in zip(test, ref):
tmp = _correct_decimal_digits(t, r)

if tmp < min_cdd:
min_cdd = tmp

return math.floor(min_cdd) >= cdd_tol


def _get_current_datetime():
y, m, d, hh, mm, ss = solver.simulation_get_current_datetime()
return datetime(y, m, d, hh, mm, ss)

def build_link_flow_solver_tuples_aligned():
tuples = []
solver.swmm_open(INPUT_FILE, REPORT_FILE, OUTPUT_FILE)
try:
solver.swmm_start(0)
# After start callback
# period_end = _get_current_datetime
# value = solver.link_get_result(0, shared_enum.LinkResult.FLOW)
# tuples.append((period_end, value))

while True:
# Before step callback
#
time_left = solver.swmm_stride(REPORT_STEP_SECONDS)
# After step callback
#
if time_left == 0:
break
# Value for the interval that just ended; align to its period-end timestamp
period_end = _get_current_datetime() - timedelta(seconds=REPORT_STEP_SECONDS)
value = solver.link_get_result(0, shared_enum.LinkResult.FLOW)
tuples.append((period_end, value))

# Before end callback
period_end = _get_current_datetime() - timedelta(seconds=REPORT_STEP_SECONDS)
value = solver.link_get_result(0, shared_enum.LinkResult.FLOW)
tuples.append((period_end, value))

solver.swmm_end()
# After end callback
#

finally:
solver.swmm_close()
# After close callback
#
return tuples

def build_link_flow_output_tuples():
EPOCH_SWMM = datetime(1899, 12, 30)
h = output.init()
output.open(h, os.path.join(DATA_PATH, "test_Example1.out"))
try:
start_days = output.get_start_date(h)
rpt = output.get_times(h, shared_enum.Time.REPORT_STEP)
n = output.get_times(h, shared_enum.Time.NUM_PERIODS)
start_dt = EPOCH_SWMM + timedelta(days=start_days)
vals = output.get_link_series(h, 0, shared_enum.LinkAttribute.FLOW_RATE, 0, n - 1)
tuples = [(start_dt + timedelta(seconds=i * rpt), float(vals[i])) for i in range(n)]
finally:
output.close(h)
return tuples

def test_compare_aligned_series():
s = build_link_flow_solver_tuples_aligned()
o = build_link_flow_output_tuples()

# times must match
solver_times = [t.strftime("%Y-%m-%d %H:%M:%S") for t, _ in s]
output_times = [t.strftime("%Y-%m-%d %H:%M:%S") for t, _ in o]
assert solver_times == output_times, (
"Time axes differ.\n"
f"Solver times: {solver_times[:5]} ...\n"
f"Output times: {output_times[:5]} ..."
)

# values should match within tolerance
solver_vals = [v for _, v in s]
output_vals = [v for _, v in o]

assert check_cdd_float(solver_vals, output_vals, 1), (
"Solver and output values differ. "
"See zipped output for details:\n" +
"\n".join(
f"{t1.strftime('%Y-%m-%d %H:%M:%S')} | {v1:.6f} || {t2.strftime('%Y-%m-%d %H:%M:%S')} | {v2:.6f} | cdd={cdd(v1, v2):.2f}"
for (t1, v1), (t2, v2) in list(zip(s, o))[:10]
)
)