Skip to content

Commit 2ed6f72

Browse files
authored
Save input shard lengths (#7897)
* save input_shard_lengths in split info * update all guilders to count input shards * style * fix tests * remove unused keys dedupe * again * again * last one * input_shard_length -> original_shard_length * add Key + typing * fix ci
1 parent 03fe070 commit 2ed6f72

File tree

26 files changed

+206
-372
lines changed

26 files changed

+206
-372
lines changed

src/datasets/arrow_writer.py

Lines changed: 3 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import json
1717
import sys
1818
from collections.abc import Iterable
19-
from typing import Any, Optional, Union
19+
from typing import Any, Optional
2020

2121
import fsspec
2222
import numpy as np
@@ -40,7 +40,6 @@
4040
)
4141
from .filesystems import is_remote_filesystem
4242
from .info import DatasetInfo
43-
from .keyhash import DuplicatedKeysError, KeyHasher
4443
from .table import array_cast, cast_array_to_feature, embed_table_storage, table_cast
4544
from .utils import logging
4645
from .utils.py_utils import asdict, convert_file_size_to_int, first_non_null_non_empty_value
@@ -414,8 +413,6 @@ def __init__(
414413
stream: Optional[pa.NativeFile] = None,
415414
fingerprint: Optional[str] = None,
416415
writer_batch_size: Optional[int] = None,
417-
hash_salt: Optional[str] = None,
418-
check_duplicates: Optional[bool] = False,
419416
disable_nullable: bool = False,
420417
update_features: bool = False,
421418
with_metadata: bool = True,
@@ -435,13 +432,6 @@ def __init__(
435432
self._features = None
436433
self._schema = None
437434

438-
if hash_salt is not None:
439-
# Create KeyHasher instance using split name as hash salt
440-
self._hasher = KeyHasher(hash_salt)
441-
else:
442-
self._hasher = KeyHasher("")
443-
444-
self._check_duplicates = check_duplicates
445435
self._disable_nullable = disable_nullable
446436

447437
if stream is None:
@@ -592,51 +582,21 @@ def write_rows_on_file(self):
592582
def write(
593583
self,
594584
example: dict[str, Any],
595-
key: Optional[Union[str, int, bytes]] = None,
596585
writer_batch_size: Optional[int] = None,
597586
):
598587
"""Add a given (Example,Key) pair to the write-pool of examples which is written to file.
599588
600589
Args:
601590
example: the Example to add.
602-
key: Optional, a unique identifier(str, int or bytes) associated with each example
603591
"""
604-
# Utilize the keys and duplicate checking when `self._check_duplicates` is passed True
605-
if self._check_duplicates:
606-
# Create unique hash from key and store as (key, example) pairs
607-
hash = self._hasher.hash(key)
608-
self.current_examples.append((example, hash))
609-
# Maintain record of keys and their respective hashes for checking duplicates
610-
self.hkey_record.append((hash, key))
611-
else:
612-
# Store example as a tuple so as to keep the structure of `self.current_examples` uniform
613-
self.current_examples.append((example, ""))
592+
# Store example as a tuple so as to keep the structure of `self.current_examples` uniform
593+
self.current_examples.append((example, ""))
614594

615595
if writer_batch_size is None:
616596
writer_batch_size = self.writer_batch_size
617597
if writer_batch_size is not None and len(self.current_examples) >= writer_batch_size:
618-
if self._check_duplicates:
619-
self.check_duplicate_keys()
620-
# Re-initializing to empty list for next batch
621-
self.hkey_record = []
622-
623598
self.write_examples_on_file()
624599

625-
def check_duplicate_keys(self):
626-
"""Raises error if duplicates found in a batch"""
627-
tmp_record = set()
628-
for hash, key in self.hkey_record:
629-
if hash in tmp_record:
630-
duplicate_key_indices = [
631-
str(self._num_examples + index)
632-
for index, (duplicate_hash, _) in enumerate(self.hkey_record)
633-
if duplicate_hash == hash
634-
]
635-
636-
raise DuplicatedKeysError(key, duplicate_key_indices)
637-
else:
638-
tmp_record.add(hash)
639-
640600
def write_row(self, row: pa.Table, writer_batch_size: Optional[int] = None):
641601
"""Add a given single-row Table to the write-pool of rows which is written to file.
642602
@@ -721,10 +681,6 @@ def write_table(self, pa_table: pa.Table, writer_batch_size: Optional[int] = Non
721681
def finalize(self, close_stream=True):
722682
self.write_rows_on_file()
723683
# In case current_examples < writer_batch_size, but user uses finalize()
724-
if self._check_duplicates:
725-
self.check_duplicate_keys()
726-
# Re-initializing to empty list for next batch
727-
self.hkey_record = []
728684
self.write_examples_on_file()
729685
# If schema is known, infer features even if no examples were written
730686
if self.pa_writer is None and self.schema:

0 commit comments

Comments
 (0)