Skip to content

rg.Dataset.records

Usage Examples

In most cases, you will not need to create a DatasetRecords object directly. Instead, you can access it via the Dataset object:

Adding records to a dataset

To add records to a dataset, use the add method. Records can be added as dictionaries or as Record objects. Single records can also be added as a dictionary or Record.

# Add records to a dataset
dataset.records.add(
    records=[
    {
        "question": "What is the capital of France?",  # 'question' matches the `rg.TextField` name
        "answer": "Paris" # 'answer' matches the `rg.TextQuestion` name
    },
    {
        "question": "What is the capital of Germany?", 
        "answer": "Berlin"
    },
])

When adding records from a native datasource, a mapping can be provided to map the keys in the native data structure to the fields and questions in Argilla. The dot notation is used to access suggestions and responses in the records.

dataset.records.add(
    records=[
        {"input": "What is the capital of France?", "output": "Paris"},
        {"input": "What is the capital of Germany?", "output": "Berlin"},
    ],
    mapping={"input": "question", "output": "answer"}, # Maps 'input' to 'question' and 'output' to 'answer'
)

Iterating over records in a dataset

Dataset.records can be used to iterate over records in a dataset from the server. The records will be fetched in batches from the server::

for record in dataset.records:
    print(record)

# Fetch records with suggestions and responses
for record in dataset.records(with_suggestions=True, with_responses=True):
    print(record.suggestions)
    print(record.responses)

# Filter records by a query and fetch records with vectors
for record in dataset.records(query="capital", with_vectors=True):
    print(record.vectors)

Check out the rg.Record class reference for more information on the properties and methods available on a record and the rg.Query class reference for more information on the query syntax.

Updating records in a dataset

Records can also be updated using the id or external_id to identify the records to be updated:

# Add records to a dataset
dataset.records.add(
    records=[
        {
            "id": "1",
            "question": "What is the capital of France?",
            "answer": "F",
        },
        {
            "id": "2",
            "question": "What is the capital of Germany?",
            "answer": "Berlin"
        },
    ]
)

# Update records in a dataset
dataset.records.update(
    records=[
        {
            "id": "1",  # matches id used in `Dataset.records.add`
            "question": "What is the capital of France?",
            "answer": "Paris",
        }
    ]
)

Exporting records from a dataset

Records can also be exported from Dataset.records. Generic python exports include to_dict and to_list methods.

dataset.records.to_dict()
# {"text": ["Hello", "World"], "label": ["greeting", "greeting"]}

dataset.records.to_list()
# [{"text": "Hello", "label": "greeting"}, {"text": "World", "label": "greeting"}]

Class Reference

rg.Dataset.records

Bases: Iterable[Record], LoggingMixin

This class is used to work with records from a dataset and is accessed via Dataset.records. The responsibility of this class is to provide an interface to interact with records in a dataset, by adding, updating, fetching, querying, deleting, and exporting records.

Attributes:

Name Type Description
client Argilla

The Argilla client object.

dataset Dataset

The dataset object.

Source code in src/argilla_sdk/records/_dataset_records.py
class DatasetRecords(Iterable[Record], LoggingMixin):
    """This class is used to work with records from a dataset and is accessed via `Dataset.records`.
    The responsibility of this class is to provide an interface to interact with records in a dataset,
    by adding, updating, fetching, querying, deleting, and exporting records.

    Attributes:
        client (Argilla): The Argilla client object.
        dataset (Dataset): The dataset object.
    """

    _api: RecordsAPI

    DEFAULT_BATCH_SIZE = 256

    def __init__(self, client: "Argilla", dataset: "Dataset"):
        """Initializes a DatasetRecords object with a client and a dataset.
        Args:
            client: An Argilla client object.
            dataset: A Dataset object.
        """
        self.__client = client
        self.__dataset = dataset
        self._api = self.__client.api.records

    def __iter__(self):
        return DatasetRecordsIterator(self.__dataset, self.__client)

    def __call__(
        self,
        query: Optional[Union[str, Query]] = None,
        batch_size: Optional[int] = DEFAULT_BATCH_SIZE,
        start_offset: int = 0,
        with_suggestions: bool = True,
        with_responses: bool = True,
        with_vectors: Optional[Union[List, bool, str]] = None,
    ):
        """Returns an iterator over the records in the dataset on the server.

        Parameters:
            query: A string or a Query object to filter the records.
            batch_size: The number of records to fetch in each batch. The default is 256.
            start_offset: The offset from which to start fetching records. The default is 0.
            with_suggestions: Whether to include suggestions in the records. The default is True.
            with_responses: Whether to include responses in the records. The default is True.
            with_vectors: A list of vector names to include in the records. The default is None.
                If a list is provided, only the specified vectors will be included.
                If True is provided, all vectors will be included.

        Returns:
            An iterator over the records in the dataset on the server.

        """
        if query and isinstance(query, str):
            query = Query(query=query)

        if with_vectors:
            self.__validate_vector_names(vector_names=with_vectors)

        return DatasetRecordsIterator(
            self.__dataset,
            self.__client,
            query=query,
            batch_size=batch_size,
            start_offset=start_offset,
            with_suggestions=with_suggestions,
            with_responses=with_responses,
            with_vectors=with_vectors,
        )

    def __repr__(self) -> str:
        return f"{self.__class__.__name__}({self.__dataset})"

    ############################
    # Public methods
    ############################

    def add(
        self,
        records: Union[dict, List[dict], Record, List[Record]],
        mapping: Optional[Dict[str, str]] = None,
        user_id: Optional[UUID] = None,
        batch_size: int = DEFAULT_BATCH_SIZE,
    ) -> List[Record]:
        """
        Add new records to a dataset on the server.

        Parameters:
            records: A dictionary or a list of dictionaries representing the records
                     to be added to the dataset. Records are defined as dictionaries
                     with keys corresponding to the fields in the dataset schema.
            mapping: A dictionary that maps the keys in the records to the fields in the dataset schema.
            user_id: The user id to be associated with the records. If not provided, the current user id is used.
            batch_size: The number of records to send in each batch. The default is 256.

        Returns:
            A list of Record objects representing the added records.

        Examples:

        Add generic records to a dataset as dictionaries:

        """
        record_models = self.__ingest_records(records=records, mapping=mapping, user_id=user_id or self.__client.me.id)

        batch_size = self._normalize_batch_size(
            batch_size=batch_size,
            records_length=len(record_models),
            max_value=self._api.MAX_RECORDS_PER_CREATE_BULK,
        )

        created_records = []
        for batch in range(0, len(record_models), batch_size):
            self.log(message=f"Sending records from {batch} to {batch + batch_size}.")
            batch_records = record_models[batch : batch + batch_size]
            models = self._api.bulk_create(dataset_id=self.__dataset.id, records=batch_records)
            created_records.extend([Record.from_model(model=model, dataset=self.__dataset) for model in models])

        self.log(
            message=f"Added {len(created_records)} records to dataset {self.__dataset.name}",
            level="info",
        )

        return created_records

    def update(
        self,
        records: Union[dict, List[dict], Record, List[Record]],
        mapping: Optional[Dict[str, str]] = None,
        user_id: Optional[UUID] = None,
        batch_size: int = DEFAULT_BATCH_SIZE,
    ) -> List[Record]:
        """Update records in a dataset on the server using the provided records
            and matching based on the external_id or id.

        Parameters:
            records: A dictionary or a list of dictionaries representing the records
                     to be updated in the dataset. Records are defined as dictionaries
                     with keys corresponding to the fields in the dataset schema. Ids or
                     external_ids should be provided to identify the records to be updated.
            mapping: A dictionary that maps the keys in the records to the fields in the dataset schema.
            user_id: The user id to be associated with the records. If not provided, the current user id is used.
            batch_size: The number of records to send in each batch. The default is 256.

        Returns:
            A list of Record objects representing the updated records.

        """
        record_models = self.__ingest_records(records=records, mapping=mapping, user_id=user_id or self.__client.me.id)
        batch_size = self._normalize_batch_size(
            batch_size=batch_size,
            records_length=len(record_models),
            max_value=self._api.MAX_RECORDS_PER_UPSERT_BULK,
        )

        created_or_updated = []
        records_updated = 0
        for batch in range(0, len(records), batch_size):
            self.log(message=f"Sending records from {batch} to {batch + batch_size}.")
            batch_records = record_models[batch : batch + batch_size]
            models, updated = self._api.bulk_upsert(dataset_id=self.__dataset.id, records=batch_records)
            created_or_updated.extend([Record.from_model(model=model, dataset=self.__dataset) for model in models])
            records_updated += updated

        records_created = len(created_or_updated) - records_updated
        self.log(
            message=f"Updated {records_updated} records and added {records_created} records to dataset {self.__dataset.name}",
            level="info",
        )

        return created_or_updated

    def to_dict(self, flatten: bool = False, orient: str = "names") -> Dict[str, Any]:
        """
        Return the records as a dictionary. This is a convenient shortcut for dataset.records(...).to_dict().

        Parameters:
            flatten (bool): Whether to flatten the dictionary and use dot notation for nested keys like suggestions and responses.
            orient (str): The structure of the exported dictionary.

        Returns:
            A dictionary of records.

        """
        return self(with_suggestions=True, with_responses=True).to_dict(flatten=flatten, orient=orient)

    def to_list(self, flatten: bool = False) -> List[Dict[str, Any]]:
        """
        Return the records as a list of dictionaries. This is a convenient shortcut for dataset.records(...).to_list().

        Parameters:
            flatten (bool): Whether to flatten the dictionary and use dot notation for nested keys like suggestions and responses.

        Returns:
            A list of dictionaries of records.
        """
        return self(with_suggestions=True, with_responses=True).to_list(flatten=flatten)

    ############################
    # Private methods
    ############################

    def __ingest_records(
        self,
        records: Union[List[Dict[str, Any]], Dict[str, Any], List[Record], Record],
        mapping: Optional[Dict[str, str]] = None,
        user_id: Optional[UUID] = None,
    ) -> List[RecordModel]:
        if isinstance(records, (Record, dict)):
            records = [records]

        if all(map(lambda r: isinstance(r, dict), records)):
            # Records as flat dicts of values to be matched to questions as suggestion or response
            records = [
                Record.from_dict(data=r, mapping=mapping, dataset=self.__dataset, user_id=user_id) for r in records
            ]  # type: ignore
        elif all(map(lambda r: isinstance(r, Record), records)):
            for record in records:
                record.dataset = self.__dataset
        else:
            raise ValueError(
                "Records should be a dictionary, a list of dictionaries, a Record instance, "
                "or a list of Record instances."
            )
        return [record.api_model() for record in records]

    def _normalize_batch_size(self, batch_size: int, records_length, max_value: int):
        norm_batch_size = min(batch_size, records_length, max_value)

        if batch_size != norm_batch_size:
            self.log(
                message=f"The provided batch size {batch_size} was normalized. Using value {norm_batch_size}.",
                level="warning",
            )

        return norm_batch_size

    def __validate_vector_names(self, vector_names: Union[List[str], str]) -> None:
        if not isinstance(vector_names, list):
            vector_names = [vector_names]
        for vector_name in vector_names:
            if isinstance(vector_name, bool):
                continue
            if vector_name not in self.__dataset.schema:
                raise ValueError(f"Vector field {vector_name} not found in dataset schema.")

__call__(query=None, batch_size=DEFAULT_BATCH_SIZE, start_offset=0, with_suggestions=True, with_responses=True, with_vectors=None)

Returns an iterator over the records in the dataset on the server.

Parameters:

Name Type Description Default
query Optional[Union[str, Query]]

A string or a Query object to filter the records.

None
batch_size Optional[int]

The number of records to fetch in each batch. The default is 256.

DEFAULT_BATCH_SIZE
start_offset int

The offset from which to start fetching records. The default is 0.

0
with_suggestions bool

Whether to include suggestions in the records. The default is True.

True
with_responses bool

Whether to include responses in the records. The default is True.

True
with_vectors Optional[Union[List, bool, str]]

A list of vector names to include in the records. The default is None. If a list is provided, only the specified vectors will be included. If True is provided, all vectors will be included.

None

Returns:

Type Description

An iterator over the records in the dataset on the server.

Source code in src/argilla_sdk/records/_dataset_records.py
def __call__(
    self,
    query: Optional[Union[str, Query]] = None,
    batch_size: Optional[int] = DEFAULT_BATCH_SIZE,
    start_offset: int = 0,
    with_suggestions: bool = True,
    with_responses: bool = True,
    with_vectors: Optional[Union[List, bool, str]] = None,
):
    """Returns an iterator over the records in the dataset on the server.

    Parameters:
        query: A string or a Query object to filter the records.
        batch_size: The number of records to fetch in each batch. The default is 256.
        start_offset: The offset from which to start fetching records. The default is 0.
        with_suggestions: Whether to include suggestions in the records. The default is True.
        with_responses: Whether to include responses in the records. The default is True.
        with_vectors: A list of vector names to include in the records. The default is None.
            If a list is provided, only the specified vectors will be included.
            If True is provided, all vectors will be included.

    Returns:
        An iterator over the records in the dataset on the server.

    """
    if query and isinstance(query, str):
        query = Query(query=query)

    if with_vectors:
        self.__validate_vector_names(vector_names=with_vectors)

    return DatasetRecordsIterator(
        self.__dataset,
        self.__client,
        query=query,
        batch_size=batch_size,
        start_offset=start_offset,
        with_suggestions=with_suggestions,
        with_responses=with_responses,
        with_vectors=with_vectors,
    )

__init__(client, dataset)

Initializes a DatasetRecords object with a client and a dataset. Args: client: An Argilla client object. dataset: A Dataset object.

Source code in src/argilla_sdk/records/_dataset_records.py
def __init__(self, client: "Argilla", dataset: "Dataset"):
    """Initializes a DatasetRecords object with a client and a dataset.
    Args:
        client: An Argilla client object.
        dataset: A Dataset object.
    """
    self.__client = client
    self.__dataset = dataset
    self._api = self.__client.api.records

add(records, mapping=None, user_id=None, batch_size=DEFAULT_BATCH_SIZE)

Add new records to a dataset on the server.

Parameters:

Name Type Description Default
records Union[dict, List[dict], Record, List[Record]]

A dictionary or a list of dictionaries representing the records to be added to the dataset. Records are defined as dictionaries with keys corresponding to the fields in the dataset schema.

required
mapping Optional[Dict[str, str]]

A dictionary that maps the keys in the records to the fields in the dataset schema.

None
user_id Optional[UUID]

The user id to be associated with the records. If not provided, the current user id is used.

None
batch_size int

The number of records to send in each batch. The default is 256.

DEFAULT_BATCH_SIZE

Returns:

Type Description
List[Record]

A list of Record objects representing the added records.

Examples:

Add generic records to a dataset as dictionaries:

Source code in src/argilla_sdk/records/_dataset_records.py
def add(
    self,
    records: Union[dict, List[dict], Record, List[Record]],
    mapping: Optional[Dict[str, str]] = None,
    user_id: Optional[UUID] = None,
    batch_size: int = DEFAULT_BATCH_SIZE,
) -> List[Record]:
    """
    Add new records to a dataset on the server.

    Parameters:
        records: A dictionary or a list of dictionaries representing the records
                 to be added to the dataset. Records are defined as dictionaries
                 with keys corresponding to the fields in the dataset schema.
        mapping: A dictionary that maps the keys in the records to the fields in the dataset schema.
        user_id: The user id to be associated with the records. If not provided, the current user id is used.
        batch_size: The number of records to send in each batch. The default is 256.

    Returns:
        A list of Record objects representing the added records.

    Examples:

    Add generic records to a dataset as dictionaries:

    """
    record_models = self.__ingest_records(records=records, mapping=mapping, user_id=user_id or self.__client.me.id)

    batch_size = self._normalize_batch_size(
        batch_size=batch_size,
        records_length=len(record_models),
        max_value=self._api.MAX_RECORDS_PER_CREATE_BULK,
    )

    created_records = []
    for batch in range(0, len(record_models), batch_size):
        self.log(message=f"Sending records from {batch} to {batch + batch_size}.")
        batch_records = record_models[batch : batch + batch_size]
        models = self._api.bulk_create(dataset_id=self.__dataset.id, records=batch_records)
        created_records.extend([Record.from_model(model=model, dataset=self.__dataset) for model in models])

    self.log(
        message=f"Added {len(created_records)} records to dataset {self.__dataset.name}",
        level="info",
    )

    return created_records

to_dict(flatten=False, orient='names')

Return the records as a dictionary. This is a convenient shortcut for dataset.records(...).to_dict().

Parameters:

Name Type Description Default
flatten bool

Whether to flatten the dictionary and use dot notation for nested keys like suggestions and responses.

False
orient str

The structure of the exported dictionary.

'names'

Returns:

Type Description
Dict[str, Any]

A dictionary of records.

Source code in src/argilla_sdk/records/_dataset_records.py
def to_dict(self, flatten: bool = False, orient: str = "names") -> Dict[str, Any]:
    """
    Return the records as a dictionary. This is a convenient shortcut for dataset.records(...).to_dict().

    Parameters:
        flatten (bool): Whether to flatten the dictionary and use dot notation for nested keys like suggestions and responses.
        orient (str): The structure of the exported dictionary.

    Returns:
        A dictionary of records.

    """
    return self(with_suggestions=True, with_responses=True).to_dict(flatten=flatten, orient=orient)

to_list(flatten=False)

Return the records as a list of dictionaries. This is a convenient shortcut for dataset.records(...).to_list().

Parameters:

Name Type Description Default
flatten bool

Whether to flatten the dictionary and use dot notation for nested keys like suggestions and responses.

False

Returns:

Type Description
List[Dict[str, Any]]

A list of dictionaries of records.

Source code in src/argilla_sdk/records/_dataset_records.py
def to_list(self, flatten: bool = False) -> List[Dict[str, Any]]:
    """
    Return the records as a list of dictionaries. This is a convenient shortcut for dataset.records(...).to_list().

    Parameters:
        flatten (bool): Whether to flatten the dictionary and use dot notation for nested keys like suggestions and responses.

    Returns:
        A list of dictionaries of records.
    """
    return self(with_suggestions=True, with_responses=True).to_list(flatten=flatten)

update(records, mapping=None, user_id=None, batch_size=DEFAULT_BATCH_SIZE)

Update records in a dataset on the server using the provided records and matching based on the external_id or id.

Parameters:

Name Type Description Default
records Union[dict, List[dict], Record, List[Record]]

A dictionary or a list of dictionaries representing the records to be updated in the dataset. Records are defined as dictionaries with keys corresponding to the fields in the dataset schema. Ids or external_ids should be provided to identify the records to be updated.

required
mapping Optional[Dict[str, str]]

A dictionary that maps the keys in the records to the fields in the dataset schema.

None
user_id Optional[UUID]

The user id to be associated with the records. If not provided, the current user id is used.

None
batch_size int

The number of records to send in each batch. The default is 256.

DEFAULT_BATCH_SIZE

Returns:

Type Description
List[Record]

A list of Record objects representing the updated records.

Source code in src/argilla_sdk/records/_dataset_records.py
def update(
    self,
    records: Union[dict, List[dict], Record, List[Record]],
    mapping: Optional[Dict[str, str]] = None,
    user_id: Optional[UUID] = None,
    batch_size: int = DEFAULT_BATCH_SIZE,
) -> List[Record]:
    """Update records in a dataset on the server using the provided records
        and matching based on the external_id or id.

    Parameters:
        records: A dictionary or a list of dictionaries representing the records
                 to be updated in the dataset. Records are defined as dictionaries
                 with keys corresponding to the fields in the dataset schema. Ids or
                 external_ids should be provided to identify the records to be updated.
        mapping: A dictionary that maps the keys in the records to the fields in the dataset schema.
        user_id: The user id to be associated with the records. If not provided, the current user id is used.
        batch_size: The number of records to send in each batch. The default is 256.

    Returns:
        A list of Record objects representing the updated records.

    """
    record_models = self.__ingest_records(records=records, mapping=mapping, user_id=user_id or self.__client.me.id)
    batch_size = self._normalize_batch_size(
        batch_size=batch_size,
        records_length=len(record_models),
        max_value=self._api.MAX_RECORDS_PER_UPSERT_BULK,
    )

    created_or_updated = []
    records_updated = 0
    for batch in range(0, len(records), batch_size):
        self.log(message=f"Sending records from {batch} to {batch + batch_size}.")
        batch_records = record_models[batch : batch + batch_size]
        models, updated = self._api.bulk_upsert(dataset_id=self.__dataset.id, records=batch_records)
        created_or_updated.extend([Record.from_model(model=model, dataset=self.__dataset) for model in models])
        records_updated += updated

    records_created = len(created_or_updated) - records_updated
    self.log(
        message=f"Updated {records_updated} records and added {records_created} records to dataset {self.__dataset.name}",
        level="info",
    )

    return created_or_updated