Skip to content

zarrnii.benchmark

Benchmarking API for evaluating OME-Zarr chunking/sharding choices and Dask scheduler setups.

Benchmarking tool for OME-Zarr chunking, sharding, and Dask configuration.

This module provides utilities to help users choose optimal chunking, sharding, and Dask scheduler settings for their workloads. It measures wall-clock time, peak memory, and CPU efficiency for the key operations:

  1. Create a synthetic Dask array of configurable shape / dtype.
  2. Write the array to an OME-Zarr store (via :func:zarrnii.save_ngff_image).
  3. Read the array back from the store.

Typical usage

Run from the command line::

zarrnii-benchmark --shape 64 256 256 \
    --chunks 32,32,32 64,64,64 \
    --shards none 64,64,64 \
    --dask-configs threads:4 distributed:4:2 \
    --output-dir ./bench_results

Or use the Python API::

from zarrnii.benchmark import BenchmarkSuite

suite = BenchmarkSuite(
    shape=(64, 256, 256),
    dtype="float32",
    chunk_shapes=[(32, 32, 32), (64, 64, 64)],
    dask_configs=[{"scheduler": "threads", "n_threads": 4}],
    output_dir="./bench_results",
)
results_df = suite.run()
suite.generate_report(results_df)

Classes

zarrnii.benchmark.DaskConfig(scheduler='threads', n_threads=4, threads_per_worker=2, label='') dataclass

Specification for a Dask scheduler setup.

Attributes:

  • scheduler (str) –

    "threads" for the built-in threaded scheduler or "distributed" for a dask.distributed.LocalCluster.

  • n_threads (int) –

    Total number of threads/cores available. For the "threads" scheduler this is passed directly as num_workers. For "distributed" it is used together with threads_per_worker to derive the worker count.

  • threads_per_worker (int) –

    Threads allocated to each distributed worker. Ignored when scheduler is "threads".

  • label (str) –

    Human-readable identifier used in reports. Auto-generated when not provided.

zarrnii.benchmark.BenchmarkConfig(shape, dtype, chunk_shape, shard_shape, dask_config) dataclass

Full configuration for one benchmark scenario.

Attributes:

  • shape (Tuple[int, ...]) –

    Spatial shape of the synthetic array, e.g. (64, 256, 256).

  • dtype (str) –

    NumPy dtype string, e.g. "float32".

  • chunk_shape (Tuple[int, ...]) –

    Zarr chunk shape.

  • shard_shape (Optional[Tuple[int, ...]]) –

    Zarr shard shape (None disables sharding).

  • dask_config (DaskConfig) –

    Dask scheduler configuration.

Attributes

zarrnii.benchmark.BenchmarkConfig.label property

Short human-readable identifier for this config.

zarrnii.benchmark.BenchmarkResult(config, repetition, write_wall_s=0.0, read_wall_s=0.0, write_cpu_s=0.0, read_cpu_s=0.0, peak_memory_mb=0.0, error='') dataclass

Timing and resource metrics for one benchmark run.

Attributes:

  • config (BenchmarkConfig) –

    The configuration that produced this result.

  • repetition (int) –

    0-based repetition index.

  • write_wall_s (float) –

    Wall-clock seconds for the OME-Zarr write.

  • read_wall_s (float) –

    Wall-clock seconds for the OME-Zarr read-back.

  • write_cpu_s (float) –

    User+system CPU seconds consumed during write.

  • read_cpu_s (float) –

    User+system CPU seconds consumed during read.

  • peak_memory_mb (float) –

    Peak RSS memory in megabytes during the run.

  • error (str) –

    Non-empty string when the run raised an exception.

Attributes

zarrnii.benchmark.BenchmarkResult.total_wall_s property

Total wall time (write + read).

zarrnii.benchmark.BenchmarkResult.write_cpu_efficiency property

CPU efficiency during write (cpu_s / wall_s / n_threads).

zarrnii.benchmark.BenchmarkResult.read_cpu_efficiency property

CPU efficiency during read (cpu_s / wall_s / n_threads).

Functions

zarrnii.benchmark.BenchmarkResult.to_dict()

Serialise to a flat dictionary suitable for CSV / pandas.

Source code in zarrnii/benchmark.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
def to_dict(self) -> Dict:
    """Serialise to a flat dictionary suitable for CSV / pandas."""
    cfg = self.config
    dc = cfg.dask_config
    return {
        "shape": str(cfg.shape),
        "dtype": cfg.dtype,
        "chunk_shape": str(cfg.chunk_shape),
        "shard_shape": str(cfg.shard_shape) if cfg.shard_shape else "",
        "scheduler": dc.scheduler,
        "n_threads": dc.n_threads,
        "threads_per_worker": dc.threads_per_worker,
        "dask_label": dc.label,
        "repetition": self.repetition,
        "write_wall_s": round(self.write_wall_s, 4),
        "read_wall_s": round(self.read_wall_s, 4),
        "total_wall_s": round(self.total_wall_s, 4),
        "write_cpu_s": round(self.write_cpu_s, 4),
        "read_cpu_s": round(self.read_cpu_s, 4),
        "write_cpu_efficiency": round(self.write_cpu_efficiency, 4),
        "read_cpu_efficiency": round(self.read_cpu_efficiency, 4),
        "peak_memory_mb": round(self.peak_memory_mb, 2),
        "error": self.error,
    }

zarrnii.benchmark.BenchmarkSuite(shape=(64, 256, 256), dtype='float32', chunk_shapes=None, shard_shapes=None, dask_configs=None, n_reps=3, output_dir='.', tmp_dir=None)

Orchestrate a set of ZarrNii read/write benchmarks.

Parameters

shape: Spatial shape of the synthetic 3-D array, e.g. (64, 256, 256). dtype: NumPy dtype string, e.g. "float32". chunk_shapes: List of chunk shapes to sweep. shard_shapes: List of shard shapes (use [None] to disable sharding). dask_configs: List of :class:DaskConfig objects (or plain dicts with keys scheduler, n_threads, threads_per_worker). n_reps: Number of repetitions per configuration (results are averaged). output_dir: Directory where CSV results and HTML report are written. Defaults to the current working directory. tmp_dir: Root directory used for the temporary OME-Zarr stores written during benchmarking (passed as dir to :class:tempfile.TemporaryDirectory). When None (the default) the system temporary directory is used. Set this to a directory on a different filesystem (e.g. a network mount) to measure I/O performance on that filesystem.

Examples

from zarrnii.benchmark import BenchmarkSuite, DaskConfig suite = BenchmarkSuite( ... shape=(32, 128, 128), ... dtype="float32", ... chunk_shapes=[(16, 64, 64), (32, 128, 128)], ... dask_configs=[DaskConfig(scheduler="threads", n_threads=4)], ... n_reps=1, ... output_dir="/tmp/bench", ... ) df = suite.run() suite.generate_report(df)

Source code in zarrnii/benchmark.py
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
def __init__(
    self,
    shape: Tuple[int, ...] = (64, 256, 256),
    dtype: str = "float32",
    chunk_shapes: Optional[Sequence[Tuple[int, ...]]] = None,
    shard_shapes: Optional[Sequence[Optional[Tuple[int, ...]]]] = None,
    dask_configs: Optional[Sequence[Union[DaskConfig, Dict]]] = None,
    n_reps: int = 3,
    output_dir: str = ".",
    tmp_dir: Optional[str] = None,
) -> None:
    self.shape = shape
    self.dtype = dtype
    self.chunk_shapes: List[Tuple[int, ...]] = list(
        chunk_shapes or [tuple(s // 2 for s in shape)]
    )
    self.shard_shapes: List[Optional[Tuple[int, ...]]] = list(
        shard_shapes or [None]
    )
    self.n_reps = n_reps
    self.output_dir = Path(output_dir)
    self.tmp_dir = tmp_dir  # root dir for TemporaryDirectory; None → system default

    # Normalise dask_configs to DaskConfig objects
    raw_configs = dask_configs or [DaskConfig(scheduler="threads", n_threads=4)]
    self.dask_configs: List[DaskConfig] = [
        DaskConfig(**c) if isinstance(c, dict) else c for c in raw_configs
    ]

Functions

zarrnii.benchmark.BenchmarkSuite.run()

Run all benchmark configurations and return a :class:pandas.DataFrame.

Returns

pandas.DataFrame One row per (configuration, repetition) with timing and resource columns. See :class:BenchmarkResult for column descriptions.

Source code in zarrnii/benchmark.py
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
def run(self) -> "pd.DataFrame":  # noqa: F821
    """Run all benchmark configurations and return a :class:`pandas.DataFrame`.

    Returns
    -------
    pandas.DataFrame
        One row per (configuration, repetition) with timing and resource
        columns.  See :class:`BenchmarkResult` for column descriptions.
    """
    try:
        import pandas as pd
    except ImportError as exc:
        raise ImportError("pandas is required for benchmark reporting") from exc

    configs = self._build_configs()
    total = len(configs) * self.n_reps
    logger.info(
        "Starting benchmark: %d configs × %d reps = %d runs",
        len(configs),
        self.n_reps,
        total,
    )

    rows = []
    with tempfile.TemporaryDirectory(dir=self.tmp_dir) as tmp_dir:
        # tmp_dir_root is a suite-level value shared across all runs:
        # use the user-specified root if provided, otherwise derive it
        # from the actual temp directory created by the OS.
        tmp_dir_root = (
            str(self.tmp_dir)
            if self.tmp_dir is not None
            else str(Path(tmp_dir).parent)
        )
        for i, cfg in enumerate(configs):
            for rep in range(self.n_reps):
                run_num = i * self.n_reps + rep + 1
                logger.info(
                    "[%d/%d] %s (rep %d/%d)",
                    run_num,
                    total,
                    cfg.label,
                    rep + 1,
                    self.n_reps,
                )
                result = _run_single(cfg, rep, tmp_dir)
                row = result.to_dict()
                row["tmp_dir_root"] = tmp_dir_root
                rows.append(row)
                _print_result(result)

    df = pd.DataFrame(rows)
    return df
zarrnii.benchmark.BenchmarkSuite.generate_report(df)

Write CSV and HTML summary reports to :attr:output_dir.

Parameters

df: DataFrame returned by :meth:run.

Source code in zarrnii/benchmark.py
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
def generate_report(self, df: "pd.DataFrame") -> None:  # noqa: F821
    """Write CSV and HTML summary reports to :attr:`output_dir`.

    Parameters
    ----------
    df:
        DataFrame returned by :meth:`run`.
    """
    try:
        import pandas as pd
    except ImportError as exc:
        raise ImportError("pandas is required for benchmark reporting") from exc

    self.output_dir.mkdir(parents=True, exist_ok=True)

    # Raw CSV
    csv_path = self.output_dir / "benchmark_results.csv"
    df.to_csv(csv_path, index=False)
    logger.info("Raw results saved to %s", csv_path)

    # JSON summary (averages per config key)
    summary_df = (
        df[df["error"] == ""]
        .groupby(
            [
                "shape",
                "dtype",
                "chunk_shape",
                "shard_shape",
                "scheduler",
                "n_threads",
                "threads_per_worker",
                "dask_label",
            ],
            dropna=False,
        )
        .agg(
            write_wall_s_mean=("write_wall_s", "mean"),
            write_wall_s_std=("write_wall_s", "std"),
            read_wall_s_mean=("read_wall_s", "mean"),
            read_wall_s_std=("read_wall_s", "std"),
            total_wall_s_mean=("total_wall_s", "mean"),
            write_cpu_efficiency_mean=("write_cpu_efficiency", "mean"),
            read_cpu_efficiency_mean=("read_cpu_efficiency", "mean"),
            peak_memory_mb_max=("peak_memory_mb", "max"),
            n_reps=("repetition", "count"),
        )
        .reset_index()
    )

    summary_csv = self.output_dir / "benchmark_summary.csv"
    summary_df.to_csv(summary_csv, index=False)
    logger.info("Summary saved to %s", summary_csv)

    # HTML report
    html_path = self.output_dir / "benchmark_report.html"
    # tmp_dir_root is the same for every row in a single suite run;
    # take the first value if present.
    tmp_dir_root = (
        df["tmp_dir_root"].iloc[0]
        if "tmp_dir_root" in df.columns and not df.empty
        else None
    )
    _write_html_report(df, summary_df, html_path, tmp_dir_root=tmp_dir_root)
    logger.info("HTML report saved to %s", html_path)

    # Print best configs
    _print_best_configs(summary_df)

Functions

zarrnii.benchmark.benchmark_cli(argv=None)

Entry point for the zarrnii-benchmark command-line tool.

Parameters

argv: Argument list (uses sys.argv[1:] when None).

Source code in zarrnii/benchmark.py
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
def benchmark_cli(argv: Optional[List[str]] = None) -> None:
    """Entry point for the ``zarrnii-benchmark`` command-line tool.

    Parameters
    ----------
    argv:
        Argument list (uses ``sys.argv[1:]`` when ``None``).
    """
    import argparse
    import sys

    parser = argparse.ArgumentParser(
        prog="zarrnii-benchmark",
        description=(
            "Benchmark OME-Zarr write/read performance across different "
            "chunking and Dask configurations."
        ),
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  zarrnii-benchmark
  zarrnii-benchmark --shape 64 256 256 --chunks 32,32,32 64,64,64
  zarrnii-benchmark --shape 64 256 256 --chunks 32,32,32 --shards none 64,64,64
  zarrnii-benchmark --dask-configs threads:4 distributed:8:2
  zarrnii-benchmark --shape 128 512 512 --chunks 64,64,64 \\
      --shards none 128,128,128 \\
      --dask-configs threads:8 distributed:8:2 \\
      --output-dir ./results --n-reps 5
  zarrnii-benchmark --tmp-dir /mnt/network_fs --output-dir ./net_results
""",
    )
    parser.add_argument(
        "--shape",
        nargs=3,
        type=int,
        default=[64, 256, 256],
        metavar=("Z", "Y", "X"),
        help="Shape of the synthetic 3-D array (default: 64 256 256)",
    )
    parser.add_argument(
        "--dtype",
        default="float32",
        help="NumPy dtype for the synthetic array (default: float32)",
    )
    parser.add_argument(
        "--chunks",
        nargs="+",
        default=None,
        metavar="Z,Y,X",
        help=(
            "One or more chunk shapes as comma-separated integers, "
            "e.g. --chunks 32,32,32 64,64,64  (default: half the array shape)"
        ),
    )
    parser.add_argument(
        "--shards",
        nargs="+",
        default=None,
        metavar="Z,Y,X|none",
        help=(
            "One or more shard shapes as comma-separated integers, or 'none' to "
            "disable sharding, e.g. --shards none 128,128,128  (default: none)"
        ),
    )
    parser.add_argument(
        "--dask-configs",
        nargs="+",
        default=["threads:4"],
        metavar="SPEC",
        help=(
            "One or more Dask scheduler specs.  "
            "Format: threads:N or distributed:N:M  "
            "(default: threads:4)"
        ),
    )
    parser.add_argument(
        "--n-reps",
        type=int,
        default=3,
        help="Number of repetitions per configuration (default: 3)",
    )
    parser.add_argument(
        "--output-dir",
        default=".",
        help="Directory for CSV and HTML report output (default: current dir)",
    )
    parser.add_argument(
        "--tmp-dir",
        default=None,
        metavar="DIR",
        help=(
            "Root directory for temporary OME-Zarr stores created during "
            "benchmarking (default: system temp dir).  Use this to benchmark "
            "I/O on a specific filesystem, e.g. a network mount."
        ),
    )
    parser.add_argument(
        "-v",
        "--verbose",
        action="store_true",
        help="Enable verbose logging",
    )

    args = parser.parse_args(argv)

    logging.basicConfig(
        level=logging.DEBUG if args.verbose else logging.INFO,
        format="%(asctime)s %(levelname)s %(name)s: %(message)s",
    )

    shape = tuple(args.shape)
    chunk_shapes = [_parse_chunk(c) for c in args.chunks] if args.chunks else None
    shard_shapes = [_parse_shard(s) for s in args.shards] if args.shards else None
    dask_configs = [_parse_dask_config(s) for s in args.dask_configs]

    suite = BenchmarkSuite(
        shape=shape,
        dtype=args.dtype,
        chunk_shapes=chunk_shapes,
        shard_shapes=shard_shapes,
        dask_configs=dask_configs,
        n_reps=args.n_reps,
        output_dir=args.output_dir,
        tmp_dir=args.tmp_dir,
    )

    print(f"ZarrNii Benchmark")
    print(f"  Shape:       {shape}")
    print(f"  Dtype:       {args.dtype}")
    print(f"  Chunks:      {suite.chunk_shapes}")
    print(f"  Shards:      {suite.shard_shapes}")
    print(f"  Dask configs:{[dc.label for dc in dask_configs]}")
    print(f"  Reps:        {args.n_reps}")
    print(f"  Tmp dir:     {args.tmp_dir or '(system default)'}")
    print(f"  Output dir:  {args.output_dir}")
    print()

    df = suite.run()
    suite.generate_report(df)
    print(f"\nReport written to: {args.output_dir}")