DataChain
¶
DataChain
¶
Bases: DatasetQuery
AI 🔗 DataChain - a data structure for batch data processing and evaluation.
It represents a sequence of data manipulation steps such as reading data from storages, running AI or LLM models or calling external services API to validate or enrich data.
Data in DataChain is presented as Python classes with arbitrary set of fields,
including nested classes. The data classes have to inherit from Feature
class.
The supported set of field types include: majority of the type supported by the
underlyind library Pydantic
.
See Also
DataChain.from_storage("s3://my-bucket/my-dir/")
- reading unstructured
data files from storages such as S3, gs or Azure ADLS.
DataChain.save("name")
- saving to a dataset.
DataChain.from_dataset("name")
- reading from a dataset.
DataChain.from_features(fib=[1, 2, 3, 5, 8])
- generating from a values.
Example
from datachain import DataChain, Feature
from datachain.lib.claude import claude_processor
class Rating(Feature):
status: str = ""
explanation: str = ""
PROMPT = "A 'user' is a human trying to find the best mobile plan.... "
MODEL = "claude-3-opus-20240229"
chain = (
DataChain.from_storage("s3://my-bucket/my")
.filter(C.name.glob("*.txt"))
.limit(5)
.map(claude=claude_processor(prompt=PROMPT, model=MODEL))
.map(
rating=lambda claude: Rating(
**(json.loads(claude.content[0].text) if claude.content else {})
),
output=Rating,
)
chain.save("ratings")
print(chain)
Source code in datachain/lib/dc.py
agg
¶
agg(
func: Optional[Callable] = None,
partition_by: Optional[PartitionByType] = None,
params: Union[None, str, Sequence[str]] = None,
output: OutputType = None,
**signal_map
) -> Self
Aggregate rows using partition_by
statement and apply a function to the
groups of aggregated rows. The function needs to return new objects for each
group of the new rows. It returns a chain itself with new signals.
Input-output relationship: N:M
This method bears similarity to gen()
and map(), employing a comparable set of
parameters, yet differs in two crucial aspects:
1. The partition_by
parameter: This specifies the column name or a list of
column names that determine the grouping criteria for aggregation.
2. Group-based UDF function input: Instead of individual rows, the function
receives a list all rows within each group defined by partition_by
.
Source code in datachain/lib/dc.py
batch_map
¶
batch_map(
func: Optional[Callable] = None,
params: Union[None, str, Sequence[str]] = None,
output: OutputType = None,
**signal_map
) -> Self
This is a batch version of map().
It accepts the same parameters plus an additional parameter:
Source code in datachain/lib/dc.py
create_empty
classmethod
¶
create_empty(
to_insert: Optional[Union[dict, list[dict]]],
session: Optional[Session] = None,
) -> DataChain
Create empty chain. Returns a chain. This method is used for programmatically generating a chains in contrast of reading data from storages or other sources.
Parameters:
-
to_insert
–records (or a single record) to insert. Each record is a dictionary of signals and theirs values.
Examples:
>>> empty = DataChain.create_empty()
>>> single_record = DataChain.create_empty(DataChain.DEFAULT_FILE_RECORD)
Source code in datachain/lib/dc.py
from_csv
classmethod
¶
from_csv(
path,
delimiter: str = ",",
header: bool = True,
column_names: Optional[list[str]] = None,
output: OutputType = None,
object_name: str = "",
model_name: str = "",
**kwargs
) -> DataChain
Generate chain from csv files.
Parameters:
-
path
–Storage URI with directory. URI must start with storage prefix such as
s3://
,gs://
,az://
or "file:///". -
delimiter
–Character for delimiting columns.
-
header
–Whether the files include a header row.
-
output
–Dictionary or feature class defining column names and their corresponding types. List of column names is also accepted, in which case types will be inferred.
-
object_name
–Created object column name.
-
model_name
–Generated model name.
Examples:
Reading a csv file:
Reading csv files from a directory as a combined dataset:
Source code in datachain/lib/dc.py
from_dataset
classmethod
¶
Get data from dataset. It returns the chain itself.
Parameters:
-
name
–dataset name
-
version
–dataset version
Examples:
Source code in datachain/lib/dc.py
from_json
classmethod
¶
from_json(
path,
type: Literal["binary", "text", "image"] = "text",
spec: Optional[DataType] = None,
schema_from: Optional[str] = "auto",
jmespath: Optional[str] = None,
object_name: str = "",
model_name: Optional[str] = None,
show_schema: Optional[bool] = False,
meta_type: Optional[str] = "json",
**kwargs
) -> DataChain
Get data from JSON. It returns the chain itself.
Parameters:
-
path
–storage URI with directory. URI must start with storage prefix such as
s3://
,gs://
,az://
or "file:///" -
type
–read file as "binary", "text", or "image" data. Default is "binary".
-
spec
–optional Data Model
-
schema_from
–path to sample to infer spec from
-
object_name
–generated object column name
-
model_name
–generated model name
-
show_schema
–print auto-generated schema
-
jmespath
–JMESPATH expression to reduce JSON
Examples:
infer JSON schema from data, reduce using JMESPATH, print schema
infer JSON schema from a particular path, print data model
Source code in datachain/lib/dc.py
from_pandas
classmethod
¶
from_pandas(
df: DataFrame,
name: str = "",
session: Optional[Session] = None,
object_name: str = "",
) -> DataChain
Generate chain from pandas data-frame.
Source code in datachain/lib/dc.py
from_parquet
classmethod
¶
from_parquet(
path,
partitioning: Any = "hive",
output: Optional[dict[str, DataType]] = None,
object_name: str = "",
model_name: str = "",
**kwargs
) -> DataChain
Generate chain from parquet files.
Parameters:
-
path
–Storage URI with directory. URI must start with storage prefix such as
s3://
,gs://
,az://
or "file:///". -
partitioning
–Any pyarrow partitioning schema.
-
output
–Dictionary defining column names and their corresponding types.
-
object_name
–Created object column name.
-
model_name
–Generated model name.
Examples:
Reading a single file:
Reading a partitioned dataset from a directory:
Source code in datachain/lib/dc.py
from_storage
classmethod
¶
from_storage(
path,
*,
type: Literal["binary", "text", "image"] = "binary",
session: Optional[Session] = None,
recursive: Optional[bool] = True,
object_name: str = "file",
**kwargs
) -> Self
Get data from a storage as a list of file with all file attributes. It returns the chain itself as usual.
Parameters:
-
path
–storage URI with directory. URI must start with storage prefix such as
s3://
,gs://
,az://
or "file:///" -
type
–read file as "binary", "text", or "image" data. Default is "binary".
-
recursive
–search recursively for the given path.
-
object_name
–Created object column name.
Source code in datachain/lib/dc.py
from_values
classmethod
¶
from_values(
ds_name: str = "",
session: Optional[Session] = None,
output: OutputType = None,
object_name: str = "",
**fr_map
) -> DataChain
Generate chain from list of values.
Source code in datachain/lib/dc.py
gen
¶
gen(
func: Optional[Callable] = None,
params: Union[None, str, Sequence[str]] = None,
output: OutputType = None,
**signal_map
) -> Self
Apply a function to each row to create new rows (with potentially new signals). The function needs to return a new objects for each of the new rows. It returns a chain itself with new signals.
Input-output relationship: 1:N
This method is similar to map()
, uses the same list of parameters, but with
one key differences: It produces a sequence of rows for each input row (like
extracting multiple file records from a single tar file or bounding boxes from a
single image file).
Source code in datachain/lib/dc.py
iterate
¶
Iterate over rows.
If columns are specified - limit them to specified columns.
Source code in datachain/lib/dc.py
map
¶
map(
func: Optional[Callable] = None,
params: Union[None, str, Sequence[str]] = None,
output: OutputType = None,
**signal_map
) -> Self
Apply a function to each row to create new signals. The function should return a new object for each row. It returns a chain itself with new signals.
Input-output relationship: 1:1
Parameters:
-
func
–Function applied to each row.
-
params
–List of column names used as input for the function. Default is taken from function signature.
-
output
–Dictionary defining new signals and their corresponding types. Default type is taken from function signature. Default can be also taken from kwargs - **signal_map (see below). If signal name is defined using signal_map (see below) only a single type value can be used.
-
**signal_map
–kwargs can be used to define
func
together with it's return signal name in format ofmap(my_sign=my_func)
. This helps define signal names and function in a nicer way.
Examples:
Using signal_map and single type in output:
>>> chain = chain.map(value=lambda name: name[:-4] + ".json", output=str)
>>> chain.save("new_dataset")
Using func and output as a map:
>>> chain = chain.map(lambda name: name[:-4] + ".json", output={"res": str})
>>> chain.save("new_dataset")
Source code in datachain/lib/dc.py
merge
¶
merge(
right_ds: DataChain,
on: Union[str, Sequence[str]],
right_on: Union[str, Sequence[str], None] = None,
inner=False,
rname="right_",
) -> Self
Merge two chains based on the specified criteria.
Parameters:
-
right_ds
–Chain to join with.
-
on
–Predicate or list of Predicates to join on. If both chains have the same predicates then this predicate is enough for the join. Otherwise,
right_on
parameter has to specify the predicates for the other chain. -
right_on
(Union[str, Sequence[str], None]
, default:None
) –Optional predicate or list of Predicates for the
right_ds
to join. -
inner
(bool
, default:False
) –Whether to run inner join or outer join.
-
rname
(str
, default:'right_'
) –name prefix for conflicting signal names.
Examples:
Source code in datachain/lib/dc.py
572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 |
|
parse_tabular
¶
parse_tabular(
output: OutputType = None,
object_name: str = "",
model_name: str = "",
**kwargs
) -> DataChain
Generate chain from list of tabular files.
Parameters:
-
output
–Dictionary or feature class defining column names and their corresponding types. List of column names is also accepted, in which case types will be inferred.
-
object_name
–Generated object column name.
-
model_name
–Generated model name.
-
kwargs
–Parameters to pass to pyarrow.dataset.dataset.
Examples:
Reading a json lines file:
>>> dc = DataChain.from_storage("s3://mybucket/file.jsonl")
>>> dc = dc.parse_tabular(format="json")
Reading a filtered list of files as a dataset:
>>> dc = DataChain.from_storage("s3://mybucket")
>>> dc = dc.filter(C("file.name").glob("*.jsonl"))
>>> dc = dc.parse_tabular(format="json")
Source code in datachain/lib/dc.py
save
¶
Save to a Dataset. It returns the chain itself.
Parameters:
-
name
–dataset name. Empty name saves to a temporary dataset that will be removed after process ends. Temp dataset are useful for optimization.
-
version
–version of a dataset. Default - the last version that exist.
Source code in datachain/lib/dc.py
select
¶
select(*args: str) -> Self
Select only a specified set of signals.
Source code in datachain/lib/dc.py
select_except
¶
select_except(*args: str) -> Self
Select all the signals expect the specified signals.
Source code in datachain/lib/dc.py
settings
¶
Change settings for chain.
This function changes specified settings without changing not specified ones. It returns chain, so, it can be chained later with next operation.
Parameters:
-
cache
–data caching (default=False)
-
batch
–size of the batch (default=1000)
-
parallel
–number of thread for processors. True is a special value to enable all available CPUs (default=1)
-
workers
–number of distributed workers. Only for Studio mode. (default=1)
-
min_task_size
–minimum number of tasks (default=1)
Example
Source code in datachain/lib/dc.py
show_json_schema
¶
Print JSON data model and save it. It returns the chain itself.
Parameters:
-
jmespath
–JMESPATH expression to reduce JSON
-
model_name
–generated model name
Examples:
print JSON schema and save to column "meta_from":
>>> uri = "gs://datachain-demo/coco2017/annotations_captions/"
>>> chain = DataChain.from_storage(uri)
>>> chain = chain.show_json_schema()
>>> chain.save()
Source code in datachain/lib/dc.py
show_jsonl_schema
¶
Print JSON data model and save it. It returns the chain itself.
Parameters:
-
jmespath
–JMESPATH expression to reduce JSON
-
model_name
–generated model name
Source code in datachain/lib/dc.py
to_pytorch
¶
Convert to pytorch dataset format.