Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pypdf/generic/_data_structures.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ def clone(
)
if ignore_fields is None:
ignore_fields = []
if len(d__.keys()) == 0:
if not d__ and hasattr(d__, "_clone"):
d__._clone(self, pdf_dest, force_duplicate, ignore_fields, visited)
return d__

Expand Down Expand Up @@ -1242,7 +1242,8 @@ def clone(
)
if ignore_fields is None:
ignore_fields = []
d__._clone(self, pdf_dest, force_duplicate, ignore_fields, visited)
if hasattr(d__, "_clone"):
d__._clone(self, pdf_dest, force_duplicate, ignore_fields, visited)
return d__

def _clone(
Expand Down
269 changes: 269 additions & 0 deletions tests/generic/test_array_based_input.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
"""Test the watermarking functionality and array based input handling in pypdf."""
from io import BytesIO

import pytest

from pypdf import PdfReader, PdfWriter
from pypdf.generic import (
ArrayObject,
DictionaryObject,
NameObject,
NumberObject,
StreamObject,
)
from pypdf.generic._base import IndirectObject
from pypdf.generic._data_structures import ContentStream


@pytest.fixture
def create_pdf_writer():
"""Provides a simple PdfWriter instance."""
return PdfWriter()

@pytest.fixture
def blank_pdf_writer():
"""Returns a PdfWriter with one blank 100x100 page."""
writer = PdfWriter()
writer.add_blank_page(width=100, height=100)
return writer

@pytest.fixture
def content_stream_with_data():
"""Create a ContentStream with actual data and operations."""
stream = StreamObject()
stream.set_data(b"BT /F1 12 Tf 50 50 Td (Hello World) Tj ET")

# Create a ContentStream from the stream
content_stream = ContentStream(stream, None)

# Add some dictionary entries to simulate real content stream
content_stream[NameObject("/Filter")] = NameObject("/FlateDecode")
content_stream[NameObject("/Length")] = NumberObject(len(stream.get_data()))

return content_stream

@pytest.fixture
def array_content_pdf_bytes():
"""Returns a PDF with a page that has an ArrayObject for its /Contents entry."""
writer = PdfWriter()
page = writer.add_blank_page(width=100, height=100)

# Create two minimal content streams
stream1 = StreamObject()
stream1.set_data(b"0 0 m 10 10 l S") # Simple path

stream2 = StreamObject()
stream2.set_data(b"BT /F0 10 Tf 20 20 Td (Test) Tj ET") # Simple text

# Set the page /Contents to an ArrayObject of streams
page[NameObject("/Contents")] = ArrayObject([
stream1,
stream2,
])

output = BytesIO()
writer.write(output)
return output.getvalue()


@pytest.fixture
def watermark_pdf_bytes(blank_pdf_writer):
"""
Returns a simple PDF with a page containing a ProcSet ArrayObject and
a valid StreamObject for /Contents, ensuring the merging logic has
the correct structures to clone and process.
"""
writer = blank_pdf_writer # Use the writer from the fixture for consistency
page = writer.pages[0]

resources = DictionaryObject()
resources[NameObject("/ProcSet")] = ArrayObject([
NameObject("/PDF"), NameObject("/Text")
])
resources[NameObject("/Font")] = DictionaryObject()

# Update the page to use this custom resource dictionary
page[NameObject("/Resources")] = resources

content_bytes = b"BT /F1 12 Tf 0 0 Td (Watermark) Tj ET"
content_stream = StreamObject()

# Use set_data() to properly assign the byte content
content_stream.set_data(content_bytes)

# A valid PdfObject value for a DictionaryObject key.
page[NameObject("/Contents")] = content_stream

# Write the mock PDF bytes from the writer containing the single page
output = BytesIO()
writer.write(output)
return output.getvalue()


def test_merge_page_with_array_procset_clones_correctly(
blank_pdf_writer, watermark_pdf_bytes
):
"""
Tests that a page with an ArrayObject (specifically for /ProcSet) in its
resources dictionary can be successfully cloned and merged via merge_page.
This simulates the scenario that triggered the ArrayObject AttributeError.
"""
watermark_reader = PdfReader(BytesIO(watermark_pdf_bytes))
watermark_page = watermark_reader.pages[0]

page = blank_pdf_writer.pages[0]

# The cloning operation happens inside the merge_page call
try:
page.merge_page(watermark_page)
except Exception as e:
pytest.fail(
f"Failed to merge page with ArrayObject in Resources. Error: {e}"
)

# Final checks after merge
# The original page should now have the resources and contents from the watermark
if NameObject("/Contents") not in page:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a test with controlled inputs. Why do we need this condition?

# If the page has no content stream yet, initialize it as an array
page[NameObject("/Contents")] = ArrayObject()
if not isinstance(page[NameObject("/Contents")].get_object(), ArrayObject):
pytest.fail("Page /Contents should have been converted to an ArrayObject after merge.")

assert NameObject("/Resources") in page
assert NameObject("/Contents") in page

# Final check by writing and reading back
output = BytesIO()
blank_pdf_writer.write(output)

reader = PdfReader(output)
assert len(reader.pages) == 1

resources_obj = page["/Resources"].get_object()
assert len(resources_obj.get("/ProcSet", [])) > 0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we able to check exact values here?



def test_watermark_preserves_original_page(blank_pdf_writer, watermark_pdf_bytes):
"""
Ensures that applying a watermark does not modify the original page
used as the watermark source (it should be cloned via deep copy).
"""
watermark_reader = PdfReader(BytesIO(watermark_pdf_bytes))
watermark_page = watermark_reader.pages[0]

# Store an original property before cloning
original_media_box = watermark_page[NameObject("/MediaBox")]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
original_media_box = watermark_page[NameObject("/MediaBox")]
original_media_box = watermark_page.mediabox


page = blank_pdf_writer.pages[0]
page.merge_page(watermark_page)

# Check if the original watermark page object's properties are unchanged
assert watermark_page[NameObject("/MediaBox")] == original_media_box

# Attempt to modify the cloned page, and check if the original is safe
page[NameObject("/Type")] = NameObject("/Modified")

# The type in the original page should still be /Page
assert watermark_page[NameObject("/Type")] == NameObject("/Page")


def test_add_page_with_array_content_stream_succeeds(array_content_pdf_bytes):
"""
Tests that adding a page where /Contents is an ArrayObject succeeds without
raising an error during the deep copy process within PdfWriter.add_page.
"""
reader = PdfReader(BytesIO(array_content_pdf_bytes))
source_page = reader.pages[0]

# Get the contents array and ensure we are working with the resolved object
content_array = source_page.get(NameObject("/Contents")).get_object()

# test presence of indirect_reference attribute
if isinstance(content_array, ArrayObject):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this heavy construct for controlled inputs?

for i, stream_obj in enumerate(content_array):
if hasattr(stream_obj, "indirect_reference"):
# Check if it already has a reference, if not, assign one
if stream_obj.indirect_reference is None:
# Assign a dummy IndirectObject that points back to itself
stream_obj.indirect_reference = IndirectObject(i + 100, 0, reader)
else:
# Try to assign the attribute if it's missing.
stream_obj.indirect_reference = IndirectObject(i + 100, 0, reader)

new_writer = PdfWriter()

try:
new_writer.add_page(source_page)
output = BytesIO()
new_writer.write(output)

except Exception as e:
pytest.fail(
f"Failed to add/write page with ArrayObject content stream. Error: {e}"
)

# check the structure of the resulting PDF
final_reader = PdfReader(output)
assert len(final_reader.pages) == 1

# Check that the /Contents stream is present and valid.
content = final_reader.pages[0].get_contents()

assert content is not None

# Check that the process completed successfully and resulted in a valid object type
assert isinstance(content, (StreamObject, ArrayObject))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we know which of these it is?


def test_populated_stream_deep_copy(create_pdf_writer):
"""
Tests that a populated StreamObject with dictionary keys and data is
correctly deep-cloned, ensuring the raw data is copied and isolated.
This covers the standard cloning path for StreamObjects.
"""
original_data = b"BT /F0 12 Tf 50 50 Td (Test Content) Tj ET"

#Create a populated StreamObject (which is truthy: len() > 0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#Create a populated StreamObject (which is truthy: len() > 0)
# Create a populated StreamObject (which is truthy: len() > 0)

populated_stream = StreamObject()
populated_stream.set_data(original_data)

#Convert the Python integer len(original_data) to a PdfObject (NumberObject)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is superfluous.

populated_stream[NameObject("/Length")] = NumberObject(len(original_data))
populated_stream[NameObject("/Filter")] = NameObject("/FlateDecode")

assert len(populated_stream) > 0 # Not falsy
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert len(populated_stream) > 0 # Not falsy
assert len(populated_stream) > 0 # Not falsy

Additionally, shouldn't we know the exact length?

assert populated_stream.get_data() == original_data

# Create a container dictionary
container_dict = DictionaryObject({
NameObject("/Populated"): populated_stream,
NameObject("/Metadata"): NameObject("/Info")
})

# Clone the container to a new writer (forcing deep copy).
new_writer = create_pdf_writer
cloned_container = container_dict.clone(pdf_dest=new_writer, force_duplicate=True)

# Check results
cloned_stream = cloned_container[NameObject("/Populated")]
assert cloned_stream is not populated_stream
assert cloned_stream[NameObject("/Filter")] == NameObject("/FlateDecode")
assert cloned_stream.get_data() == original_data
populated_stream.set_data(b"NEW MODIFIED DATA")

# Change dictionary key
populated_stream[NameObject("/Filter")] = NameObject("/LZWDecode")

# Clone should retain original values
assert cloned_stream.get_data() == original_data
assert cloned_stream[NameObject("/Filter")] == NameObject("/FlateDecode")

def test_contentstream_clone_has_clone_method(create_pdf_writer, content_stream_with_data):
"""Test that ContentStream has a _clone method and it's called during clone()."""
# Verify ContentStream has _clone method
assert hasattr(ContentStream, "_clone")
pdf_writer = create_pdf_writer
# Clone and verify it completes successfully
cloned = content_stream_with_data.clone(pdf_writer, force_duplicate=True)
# The _clone method should have been called, so verify the clone is valid
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would happen without the _clone method?

assert isinstance(cloned, ContentStream)
assert cloned.pdf == pdf_writer
Loading