Skip to content

Commit f78f718

Browse files
Merge from aws/aws-sam-cli/develop
2 parents c91f5f8 + 6738885 commit f78f718

14 files changed

+722
-32
lines changed

samcli/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22
SAM CLI version
33
"""
44

5-
__version__ = "1.150.0"
5+
__version__ = "1.150.1"

samcli/lib/clients/lambda_client.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,15 @@ def create(cls, host: str = "localhost", port: int = 5000, region: str = "us-wes
5353
# Create a fresh botocore session
5454
session = botocore.session.Session()
5555

56-
# Create the boto3 client with custom service model using the fresh session
56+
# Create the boto3 client using the fresh session
5757
client = session.create_client(
5858
"lambda",
5959
endpoint_url=endpoint_url,
6060
region_name=region,
61+
# the emulator doesnt access any AWS resources,
62+
# but we need _some_ credentials to create a boto client
63+
aws_access_key_id="foo",
64+
aws_secret_access_key="bar",
6165
)
6266

6367
return cls(client)
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
"""
2+
Interactive callback handler for durable function executions.
3+
"""
4+
5+
import logging
6+
from typing import Optional
7+
8+
import click
9+
10+
from samcli.lib.clients.lambda_client import DurableFunctionsClient
11+
12+
LOG = logging.getLogger(__name__)
13+
14+
# Menu choice constants
15+
CHOICE_SUCCESS = 1
16+
CHOICE_FAILURE = 2
17+
CHOICE_HEARTBEAT = 3
18+
CHOICE_STOP = 4
19+
20+
21+
class DurableCallbackHandler:
22+
"""
23+
Handles interactive callback detection and response for durable executions.
24+
"""
25+
26+
def __init__(self, client: DurableFunctionsClient):
27+
self.client = client
28+
self._prompted_callbacks: set[str] = set() # Track which callbacks we've already prompted for
29+
30+
def check_for_pending_callbacks(self, execution_arn: str) -> Optional[str]:
31+
"""
32+
Check execution history for pending callbacks.
33+
34+
Returns:
35+
callback_id if found, None otherwise
36+
"""
37+
try:
38+
LOG.debug("Checking for pending callbacks in execution: %s", execution_arn)
39+
history = self.client.get_durable_execution_history(execution_arn)
40+
events = history.get("Events", [])
41+
42+
if events:
43+
callback_states = {}
44+
45+
for event in events:
46+
event_type = event.get("EventType")
47+
event_id = event.get("Id")
48+
49+
if event_type == "CallbackStarted":
50+
callback_id = event.get("CallbackStartedDetails", {}).get("CallbackId")
51+
callback_states[event_id] = {"callback_id": callback_id, "status": "STARTED", "event": event}
52+
elif event_type in ["CallbackCompleted", "CallbackFailed", "CallbackSucceeded"]:
53+
if event_id in callback_states:
54+
callback_states[event_id]["status"] = "COMPLETED"
55+
56+
# Find callbacks that are started but not completed
57+
for callback_id, state in callback_states.items():
58+
if state["status"] == "STARTED" and state["callback_id"]:
59+
return str(state["callback_id"])
60+
61+
except Exception as e:
62+
LOG.error("Failed to check callback history: %s", e)
63+
64+
return None
65+
66+
def prompt_callback_response(self, execution_arn: str, callback_id: str, execution_complete=None) -> bool:
67+
"""
68+
Prompt user for callback response and send it.
69+
70+
Args:
71+
execution_arn: The execution ARN for stop execution operation
72+
callback_id: The callback ID to respond to
73+
execution_complete: Optional threading.Event to check if execution finished
74+
75+
Returns:
76+
True if callback was sent, False if user chose to continue waiting
77+
"""
78+
# Only prompt once per callback ID to avoid blocking on timed-out callbacks
79+
if callback_id in self._prompted_callbacks:
80+
return False
81+
82+
self._prompted_callbacks.add(callback_id)
83+
84+
# Check if execution already completed before prompting
85+
if execution_complete and execution_complete.is_set():
86+
return False
87+
88+
click.echo(f"\n🔄 Execution is waiting for callback: {callback_id}")
89+
click.echo("Choose an action:")
90+
click.echo(" 1. Send callback success")
91+
click.echo(" 2. Send callback failure")
92+
click.echo(" 3. Send callback heartbeat")
93+
click.echo(" 4. Stop execution")
94+
95+
choice = click.prompt("Enter choice", type=click.IntRange(1, 4), default=CHOICE_SUCCESS)
96+
97+
# Check again after user makes selection in case execution completed
98+
if execution_complete and execution_complete.is_set():
99+
click.echo("⚠️ Execution already completed, callback no longer needed")
100+
return False
101+
102+
try:
103+
if choice == CHOICE_SUCCESS:
104+
result = click.prompt("Enter success result (optional)", default="", show_default=False)
105+
self.client.send_callback_success(callback_id=callback_id, result=result)
106+
click.echo("✅ Callback success sent")
107+
return True
108+
109+
elif choice == CHOICE_FAILURE:
110+
error_message = click.prompt("Enter error message", default="User cancelled")
111+
error_type = click.prompt("Enter error type (optional)", default="", show_default=False) or None
112+
113+
self.client.send_callback_failure(
114+
callback_id=callback_id, error_message=error_message, error_type=error_type
115+
)
116+
click.echo("❌ Callback failure sent")
117+
return True
118+
119+
elif choice == CHOICE_HEARTBEAT:
120+
self.client.send_callback_heartbeat(callback_id=callback_id)
121+
click.echo("💓 Callback heartbeat sent")
122+
return False # Continue waiting after heartbeat
123+
124+
else: # CHOICE_STOP
125+
error_message = click.prompt("Enter error message", default="Execution stopped by user")
126+
error_type = click.prompt("Enter error type (optional)", default="", show_default=False) or None
127+
128+
self.client.stop_durable_execution(
129+
durable_execution_arn=execution_arn, error_message=error_message, error_type=error_type
130+
)
131+
click.echo("🛑 Execution stopped")
132+
return True
133+
134+
except Exception as e:
135+
LOG.error("Failed to send callback: %s", e)
136+
click.echo(f"❌ Failed to send callback: {e}")
137+
return False

samcli/local/docker/durable_functions_emulator_container.py

Lines changed: 63 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import time
88
from http import HTTPStatus
99
from pathlib import Path
10+
from tempfile import NamedTemporaryFile
1011
from typing import Optional
1112

1213
import docker
@@ -15,7 +16,8 @@
1516

1617
from samcli.lib.build.utils import _get_host_architecture
1718
from samcli.lib.clients.lambda_client import DurableFunctionsClient
18-
from samcli.local.docker.utils import get_validated_container_client, is_image_current
19+
from samcli.lib.utils.tar import create_tarball
20+
from samcli.local.docker.utils import get_tar_filter_for_windows, get_validated_container_client, is_image_current
1921

2022
LOG = logging.getLogger(__name__)
2123

@@ -27,6 +29,7 @@ class DurableFunctionsEmulatorContainer:
2729

2830
_RAPID_SOURCE_PATH = Path(__file__).parent.joinpath("..", "rapid").resolve()
2931
_EMULATOR_IMAGE = "public.ecr.aws/ubuntu/ubuntu:24.04"
32+
_EMULATOR_IMAGE_PREFIX = "samcli/durable-execution-emulator"
3033
_CONTAINER_NAME = "sam-durable-execution-emulator"
3134
_EMULATOR_DATA_DIR_NAME = ".durable-executions-local"
3235
_EMULATOR_DEFAULT_STORE_TYPE = "sqlite"
@@ -190,6 +193,62 @@ def _get_emulator_binary_name(self):
190193
arch = _get_host_architecture()
191194
return f"aws-durable-execution-emulator-{arch}"
192195

196+
def _generate_emulator_dockerfile(self, emulator_binary_name: str) -> str:
197+
"""Generate Dockerfile content for emulator image."""
198+
return (
199+
f"FROM {self._EMULATOR_IMAGE}\n"
200+
f"COPY {emulator_binary_name} /usr/local/bin/{emulator_binary_name}\n"
201+
f"RUN chmod +x /usr/local/bin/{emulator_binary_name}\n"
202+
)
203+
204+
def _get_emulator_image_tag(self, emulator_binary_name: str) -> str:
205+
"""Get the Docker image tag for the emulator."""
206+
return f"{self._EMULATOR_IMAGE_PREFIX}:{emulator_binary_name}"
207+
208+
def _build_emulator_image(self):
209+
"""Build Docker image with emulator binary."""
210+
emulator_binary_name = self._get_emulator_binary_name()
211+
binary_path = self._RAPID_SOURCE_PATH / emulator_binary_name
212+
213+
if not binary_path.exists():
214+
raise RuntimeError(f"Durable Functions Emulator binary not found at {binary_path}")
215+
216+
image_tag = self._get_emulator_image_tag(emulator_binary_name)
217+
218+
# Check if image already exists
219+
try:
220+
self._docker_client.images.get(image_tag)
221+
LOG.debug(f"Emulator image {image_tag} already exists")
222+
return image_tag
223+
except docker.errors.ImageNotFound:
224+
LOG.debug(f"Building emulator image {image_tag}")
225+
226+
# Generate Dockerfile content
227+
dockerfile_content = self._generate_emulator_dockerfile(emulator_binary_name)
228+
229+
# Write Dockerfile to temp location and build image
230+
with NamedTemporaryFile(mode="w", suffix="_Dockerfile") as dockerfile:
231+
dockerfile.write(dockerfile_content)
232+
dockerfile.flush()
233+
234+
# Prepare tar paths for build context
235+
tar_paths = {
236+
dockerfile.name: "Dockerfile",
237+
str(binary_path): emulator_binary_name,
238+
}
239+
240+
# Use shared tar filter for Windows compatibility
241+
tar_filter = get_tar_filter_for_windows()
242+
243+
# Build image using create_tarball utility
244+
with create_tarball(tar_paths, tar_filter=tar_filter, dereference=True) as tarballfile:
245+
try:
246+
self._docker_client.images.build(fileobj=tarballfile, custom_context=True, tag=image_tag, rm=True)
247+
LOG.info(f"Built emulator image {image_tag}")
248+
return image_tag
249+
except Exception as e:
250+
raise ClickException(f"Failed to build emulator image: {e}")
251+
193252
def _pull_image_if_needed(self):
194253
"""Pull the emulator image if it doesn't exist locally or is out of date."""
195254
try:
@@ -218,9 +277,6 @@ def start(self):
218277
return
219278

220279
emulator_binary_name = self._get_emulator_binary_name()
221-
binary_path = self._RAPID_SOURCE_PATH / emulator_binary_name
222-
if not binary_path.exists():
223-
raise RuntimeError(f"Durable Functions Emulator binary not found at {binary_path}")
224280

225281
"""
226282
Create persistent volume for execution data to be stored in.
@@ -231,16 +287,15 @@ def start(self):
231287
os.makedirs(emulator_data_dir, exist_ok=True)
232288

233289
volumes = {
234-
str(self._RAPID_SOURCE_PATH): {"bind": "/usr/local/bin", "mode": "ro"},
235290
emulator_data_dir: {"bind": "/tmp/.durable-executions-local", "mode": "rw"},
236291
}
237292

238-
# Pull the image if needed
239-
self._pull_image_if_needed()
293+
# Build image with emulator binary
294+
image_tag = self._build_emulator_image()
240295

241296
LOG.debug(f"Creating container with name={self._container_name}, port={self.port}")
242297
self.container = self._docker_client.containers.create(
243-
image=self._EMULATOR_IMAGE,
298+
image=image_tag,
244299
command=[f"/usr/local/bin/{emulator_binary_name}", "--host", "0.0.0.0", "--port", str(self.port)],
245300
name=self._container_name,
246301
ports={f"{self.port}/tcp": self.port},

samcli/local/docker/durable_lambda_container.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import click
1010
from flask import has_request_context
1111

12+
from samcli.lib.utils.durable_callback_handler import DurableCallbackHandler
1213
from samcli.lib.utils.durable_formatters import format_execution_details, format_next_commands_after_invoke
1314
from samcli.local.docker.lambda_container import LambdaContainer
1415

@@ -58,6 +59,9 @@ def _update_lambda_environment_with_emulator_endpoint(self, kwargs):
5859
extra_hosts["host.docker.internal"] = "host-gateway"
5960
kwargs["extra_hosts"] = extra_hosts
6061

62+
# Bind to 0.0.0.0 so emulator can reach Lambda via host.docker.internal
63+
kwargs["container_host_interface"] = "0.0.0.0"
64+
6165
def _get_lambda_container_endpoint(self):
6266
"""
6367
Get the Lambda container endpoint URL for the emulator to invoke.
@@ -146,8 +150,11 @@ def _write_execution_result_to_stdout(self, execution_details: dict, stdout):
146150
def _wait_for_execution(self, execution_arn):
147151
"""Poll the execution status until completion and return the final result."""
148152

149-
# TODO - poll until the execution timeout is hit
153+
callback_handler = DurableCallbackHandler(self.emulator_container.lambda_client)
150154
execution_details = None
155+
callback_thread = None
156+
stop_callback_prompts = threading.Event()
157+
151158
try:
152159
while True:
153160
try:
@@ -156,13 +163,36 @@ def _wait_for_execution(self, execution_arn):
156163
status = execution_details.get("Status")
157164

158165
if status != "RUNNING":
166+
stop_callback_prompts.set() # Signal callback thread to stop
167+
if callback_thread and callback_thread.is_alive():
168+
callback_thread.join(timeout=0.5) # Brief wait for thread cleanup
159169
return execution_details
160170

171+
# Check for pending callbacks (only in CLI context)
172+
if self._is_cli_context():
173+
callback_id = callback_handler.check_for_pending_callbacks(execution_arn)
174+
if callback_id:
175+
176+
def _prompt_in_thread():
177+
if not stop_callback_prompts.is_set():
178+
# give the function logs time to settle after the invocation is suspended
179+
time.sleep(0.5)
180+
callback_sent = callback_handler.prompt_callback_response(
181+
execution_arn, callback_id, stop_callback_prompts
182+
)
183+
if callback_sent:
184+
click.echo("\n" + "─" * 80)
185+
186+
# Start callback prompt in separate thread so it doesn't block polling
187+
callback_thread = threading.Thread(target=_prompt_in_thread, daemon=True)
188+
callback_thread.start()
189+
161190
time.sleep(1) # Poll every second
162191
except Exception as e:
163192
LOG.error("Error polling execution status: %s", e)
164193
break
165194
finally:
195+
stop_callback_prompts.set() # Ensure callback thread knows to stop
166196
self._cleanup_if_needed()
167197

168198
return execution_details

samcli/local/docker/lambda_image.py

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,12 @@
2626
from samcli.lib.utils.stream_writer import StreamWriter
2727
from samcli.lib.utils.tar import create_tarball
2828
from samcli.local.common.file_lock import FileLock, cleanup_stale_locks
29-
from samcli.local.docker.utils import get_docker_platform, get_rapid_name, get_validated_container_client
29+
from samcli.local.docker.utils import (
30+
get_docker_platform,
31+
get_rapid_name,
32+
get_tar_filter_for_windows,
33+
get_validated_container_client,
34+
)
3035

3136
LOG = logging.getLogger(__name__)
3237

@@ -409,15 +414,8 @@ def _build_image(self, base_image, docker_tag, layers, architecture, stream=None
409414
for layer in layers:
410415
tar_paths[layer.codeuri] = "/" + layer.name
411416

412-
# Set permission for all the files in the tarball to 500(Read and Execute Only)
413-
# This is need for systems without unix like permission bits(Windows) while creating a unix image
414-
# Without setting this explicitly, tar will default the permission to 666 which gives no execute permission
415-
def set_item_permission(tar_info):
416-
tar_info.mode = 0o500
417-
return tar_info
418-
419-
# Set only on Windows, unix systems will preserve the host permission into the tarball
420-
tar_filter = set_item_permission if platform.system().lower() == "windows" else None
417+
# Use shared tar filter for Windows compatibility
418+
tar_filter = get_tar_filter_for_windows()
421419

422420
with create_tarball(tar_paths, tar_filter=tar_filter, dereference=True) as tarballfile:
423421
try:

0 commit comments

Comments
 (0)