The Birth of Parquet

145 points
1/20/1970
13 days ago
by whinvik

Comments


fizx

Fun story time: I was at twitter for a few years and tended to write quick hacks that people wanted to replace with better engineering.

We never had scala thrift bindings, and the Java ones were awkward from Scala, so I wrote a thrift plugin in JRuby that used the Ruby thrift parser and ERb web templates to output some Scala code. Integrated with our build pipeline, worked great for the company.

I also wrote one era of twitter's service deploy system on a hacked up Capistrano.

These projects took a few days because they were dirty hacks, but I still got a below perf review for getting easily distracted, because I didn't yet know how to sell those company-wide projects.

Anyhow, about a month before that team kicked off Parquet, I showed them a columnar format I made for a hackweek based on Lucene's codec packages, and was using to power a mixpanel-alike analytics system.

I'm not sure whether they were inspired or terrified that my hack would reach production, but I like to think I had a small hand in getting Parquet kickstarted.

11 days ago

hi-v-rocknroll

Hahaha. Don't ask how the sausage gets made, so long as the sausage machine is readily maintainable by more than one person or has documentation.

Partially explains how murder came to be https://github.com/lg/murder

> below perf review

That's some cheap bullshit. Fuck marketing-oriented corporate engineering.

10 days ago

colloydi

Tangential to the topic but regarding the supposed Snowball Effect there is in real life no such thing. I have pushed large 'snowballs' down slopes --in reality they are snow cylinders as shown in the photo-- and they invariably do not get far. The reason being that when one side of the cylinder randomly thickens slightly with respect to the other side this causes the whole thing to turn in the opposite direction.

For example, if the RHS of your cylinder has a slightly larger radius than the LHS the cylinder will commence turning to the left.

The upshot is the thick side picks up more snow than the thin side and the disparity in radii increases more rapidly still. The cylinder becomes a truncated cone which turns sideways and halts!

11 days ago

lsowen

It might be rare, but it _can_ happen: https://www.youtube.com/watch?v=ZdIclqha014

11 days ago

dendrite9

It is highly dependent on the snow conditions and the recent weather. Sometimes even just the a couple hours are enough to change the conditions to have a good chance of rollerballs. The climate also has an impact, in my experience more coastal areas have more periods when they form.

And in some cases the rollerballs get too tall for the bonding strength of the snow, so they break into parts that can restart the cycle if the slope is steep enough.

10 days ago

whinvik

Sounds to me like there's a long blog post waiting to be written.

11 days ago

pradeepchhetri

Reading through this blog, to me it seems Parquet is lot like ClickHouse native data format.

Best part of ClickHouse native data format is I can use the same ClickHouse queries and can run in local or remote server/cluster and let ClickHouse to decide the available resources in the most performant way.

ClickHouse has a native and the fastest integration with Parquet so i can:

- Query local/s3 parquet data from command line using clickhouse-local.

- Query large amount of local/s3 data programmatically by offloading it to clickhouse server/cluster which can do processing in distributed fashion.

11 days ago

pradeepchhetri

If you are interested in reading internals of using Parquet with ClickHouse, do read following articles:

- https://clickhouse.com/blog/apache-parquet-clickhouse-local-...

- https://clickhouse.com/blog/apache-parquet-clickhouse-local-...

11 days ago

calderwoodra

I've been struggeling with a tough parquet problem for a few months now.

I have a 15gb parquet file in a s3 bucket and I need to "unzip" and extract every row from the file to write into my database. The contents of the file are emails and I need to integrate them into our search function.

Is this possible to do without an unreasonable amount of RAM? Are there any affordable services that can help here?

Feel free to contact me (email in bio), happy to pay for a consult at the minimum.

11 days ago

chrisjc

DuckDB?

https://duckdb.org/2024/03/29/external-aggregation.html

https://duckdb.org/2021/06/25/querying-parquet.html

If your DB is mysql or postgres, then you could read a stream from parquet, transform inline and write out to your DB

https://duckdb.org/2024/01/26/multi-database-support-in-duck...

And an unrelated, but interesting read about the parquet bomb

https://duckdb.org/2024/03/26/42-parquet-a-zip-bomb-for-the-...

11 days ago

lonesword

I work with pyspark and parquet quite a lot. I never had to deal with parquet outside spark, but this is how I would do this:

- Write a pandas_udf function in pyspark.

- Parition your data into smaller bits so that the pandas_udf does not get too much data at the same time.

Something like:

```

from pyspark.sql import SparkSession

import pyspark.sql.functions as f

@f.pandas_udf(return_type=whatever)

def ingest(doc: pd.Series): # doc is a pandas series now

    # your processing goes here -> write to DB e.t.c

    pd_series_literal = Create a pd.Series that just contains the integer 0 to make spark happy

    return pd_series_literal
spark = SparkSession.builder.getOrCreate()

df = spark.read.parquet("s3 path")

df = df.repartition(1000). # bump up this number if you run into memory issues

df = df.withColumn("foo", ingest(f.col("doc_column"))

```

Now the trick is, you can limit how much data is given to your pandas_udf by repartitioning your data. The more the partitions, the smaller the pd.Series that your pandas_udf gets. There's also the `spark.sql.execution.arrow.maxRecordsPerBatch` config that you can set in spark to limit memory consumption.

^ Probably overkill to bring spark into the equation, but this is one way to do it.

You can use a normal udf (i.e `f.udf()`) instead of a pandas_udf, but apparently that's slower due to java <-> python serialization

11 days ago

fifilura

Pyspark is probably the way to go.

I just wanted to mention that AWS Athena eats 15G parquet files for breakfast.

It is trivial to map the file into Athena.

But you can't connect it to anything else than file output. But it can help you to for example write it to smaller chunks. Or choose another output format such as csv (although arbitrary email content in a csv feels like you are set up for parsing errors).

The benefit is that there is virtually no setup cost. And processing cost for a 15G file will be just a few cents.

11 days ago

calderwoodra

Athena is probably my best bet tbh, especially if I can do a few clicks and just get smaller files. Processing smaller files is a no brainer / pretty easy and could be outsourced to lambda.

11 days ago

fifilura

Yeah the big benefit is that it requires very little setup.

You create a new partitioned table/location from the originally mapped file using a CTAS like so:

  CREATE TABLE new_table_name
  WITH (
    format = 'PARQUET',
    parquet_compression = 'SNAPPY',
    external_location = 's3://your-bucket/path/to/output/'
  ) AS
  SELECT *
  FROM original_table_name
  PARTITIONED BY partition_column_name
You can probably create a hash and partition by the last character if you want 16 evenly sized partitions. Unless you already have a dimension to partition by.
11 days ago

jcgrillo

It's been a while (~5yr) since I've done anything with Spark, but IIRC it used to be very difficult to make reliable jobs with the Java or Python APIs due to the impedance mismatch between Scala's lazy evaluation semantics and the eager evaluation of Java and Python. I'd encounter perplexing OOMs whenever I tried to use the Python or Java APIs, so I (reluctantly) learned enough Scala to make the Spark go brr and all was well. Is it still like this?

11 days ago

okr

Same for me, the only reason to learn scala was Spark. The Java Api was messy. And still today, i like Scala, well, many functional languages, but for jumping between projects they are a nightmare, as everything is dense and cluttered.

9 days ago

memset

I can help! Emailing you now :)

Our company (scratchdata.com, open source) is literally built to solve the problem of schlepping large amounts of data between sources and destinations, so I have worked on this problem a lot personally and happy to nerd out about what works.

11 days ago

fock

I - by my HPC-background - am wondering quite a bit what happened that 15GB-files are considered large data? Not being a crazy parquet-user, but:

- does this decompress to giant sizes? - can't you split the file easily, because it includes row-based segments? - why does it take months to solve this for one file?

11 days ago

semi-extrinsic

As a fellow HPC user, I tried a couple of years ago to do a tricky data format conversion using these newfangled tools. I was essentially just taking a huge (multi-terabyte) 3D dataset, transposing it and changing the endianness.

The solutions I was able to put together using Dask and Spark and such were all insanely slow, they just got killed by Slurm without getting anywhere. In the end I went back to good ole' shell scripting with xxd to handle most of the heavy lifting. Finished in under an hour.

The appeal of these newfangled tools is that you can work with data sizes that are infeasible to people who only know Excel, yet you don't need to understand a single thing about how your data is actually stored.

If you can be bothered to read the file format specification, open up some files in a hex editor to understand the layout, and write low-level code to parse the data - then you can achieve several orders of magnitude higher performance.

11 days ago

fifilura

I think command line tools is going to be fine if all you do is process one row at a time. Or if your data has a known order.

But if you want to do some kind of grouping or for example pivoting rows to columns, I think you will still benefit from a distributed tool like Spark or Trino. That can do the map/reduce job for you in a distributed way.

10 days ago

memset

Because most people don’t have an HPC background, aren’t familiar with parquet internals, don’t know how to make their language stream data instead of buffering it all in memory, have slow internet connections at home, are running out of disk space on their laptops, and only have 4 GB of ram to work with after Chrome and Slack take up the other 12 GB.

15 GB is a real drag to do anything with. So it’s a real pain when someone says “I’ll just give you 1 TB worth of parquet in S3”, the equivalent of dropping a billion dollars on someone’s doorstep in $1 bills.

11 days ago

vladsanchez

Funny analogy! I loved it. I'm ready to start with ScratchData which btw and respectfully never heard of.

Thanks again for sharing your tool and insightful knowledge.

11 days ago

fifilura

How do you see the competition from Trino and Athena in your case?

Depends a lot on what you want to do with the data of course, but if you want to filter and slice/dice it, my experience is that it is really fast and stable. And if you already have it on s3, the threshold for using it is extremely small.

10 days ago

fock

what is your point? They talked about 15GB of parquet - what does this have to do with 1TB of parquet?

Also: How does the tool you sell here solve the problem - the data is already there and can't be processed (15GB - funny that seems to be the scale of YC startups?)? How does a tool to transfer the data into a new database help here?

10 days ago

wodenokoto

> How does a tool to transfer the data into a new database help here?

Maybe because the problem literally is "how to transfer this data into a database"

10 days ago

jjtheblunt

Parquet is column oriented and so row-based manipulation can be inefficient

11 days ago

fock

> Hierarchically, a file consists of one or more row groups.

https://parquet.apache.org/docs/concepts/

Maybe the file in question only has one row group. Which would be weird, because the creator had to go out of their way to make it happen.

11 days ago

jjtheblunt

Yep. I use it all the time. But, as you said, depends on specific layouts, so can’t expect it to be row-convenient .

11 days ago

martinky24

Wouldn't partial reads fix the RAM problem? e.g. something like this: https://stackoverflow.com/a/69888274

It might not be fast, but a quick 1-off solution that you let run for a while would probably do that job. There shouldn't be a need to load the whole file into memory.

11 days ago

wild_egg

Have you given DuckDB a try? I'm using it to shuttle some hefty data between postgres and some parquet files on S3 and it's only a couple lines. Haven't noted any memory issues so far

11 days ago

csjh

Agreed on DuckDB, fantastic for working with most major data formats

11 days ago

calderwoodra

Took your advice and tried DuckDB. Here's what I've got so far:

```

def _get_duck_db_arrow_results(s3_key):

    con = duckdb.connect(config={'threads': 1, 'memory_limit': '1GB'})
    con.install_extension("aws")
    con.install_extension("httpfs")
    con.load_extension("aws")
    con.load_extension("httpfs")

    con.sql("CALL load_aws_credentials('hadrius-dev', set_region=true);")
    con.sql("CREATE SECRET (TYPE S3,PROVIDER CREDENTIAL_CHAIN);")
    results = con \
        .execute(f"SELECT * FROM read_parquet('{s3_key}');") \
        .fetch_record_batch(1024)
    for index, result in enumerate(results):
        print(index)
    return results
```

I ran the above on a 1.4gb parquet file and 15 min later, all of the results were printed at once. This suggests to me that the whole file was loaded loaded into memory at once.

11 days ago

akdor1154

You asked it to fetch a batch (15min) then iterated over the batch (all at once).

To stream, fetch more batches.

What ddb does to get the batches depends on hand wavey magic around available ram, and also the structure of the parquet.

11 days ago

wild_egg

It's been a long time since I've used python but that sounds like buffering in the library maybe? I use it from Go and it seems to behave differently.

When I'm writing to postgres though I'm doing into entirely inside DuckDB with a `INSERT INTO ... SELECT ...` and that seems to stream it over.

11 days ago

arrowleaf

Using polars in Python I've gotten similar to work, using LazyFrame and collect in streaming mode:

``` df = pl.scan_parquet('tmp/'+DUMP_NAME+'_cleaned.parquet')

with open('tmp/'+DUMP_NAME+'_cleaned.jsonl', mode='w', newline='\n', encoding='utf8') as f: for row in df.collect(streaming=True).iter_rows(named=True): row = {k: v for k, v in row.items() if (v is not None and v != [] and v != '')} f.write(json.dumps(row, default=str) + '\n') ```

11 days ago

akdor1154

This collects all into memory, then iterates.

11 days ago

cycrutchfield

My suggestion is to load each row group individually, as they generally will be much smaller than your total file size. You can do this via pyarrow.ParquetFile.read_row_group. To truly optimize this for reading from s3 you could use fsspec’s open_parquet_file library which would allow you to only load each row group one at a time.

11 days ago

sagia

I had similiar issue, but for aggreagations. Use case was to "compress" large datasets into smaller aggregations for insertion into a costly db. At first we used duckdb but memory became an issue there and we also bumped into a couple of issues with how duckdb handles arrays. We then moved this workload to clickhouse local, which was faster and had more fine tuning options to our liking. in this case was limiting ram usage with i.e. max_bytes_before_external_group_by

11 days ago

392

I'm puzzled as to why this is a problem that has lasted months. My phone has enough RAM to work with this file in memory. Do not use pyspark, it is unbelievably slow and memory hogging if you hold it even slightly wrong. Spark is for tb-sized data, at minimum.

Have you tried downloading the file from s3 to /tmp, opening it with pandas, iterating through 1000 row chunks, pushing to DB? The default DF to SQL built into pandas doesn't batch the inserts so it will be about 10x slower than necessary, but speeding that up is a quick google->SO away.

10 days ago

dijksterhuis

Try pyarrow.ParquetFile.iter_batches()

Streams batches of rows

https://arrow.apache.org/docs/python/generated/pyarrow.parqu...

Edit — May need to do some extra work with s3fs too from what I recall with the default pandas s3 reading

Edit 2 — or check out pyarrow.fs.S3FileSystem :facepalm:

11 days ago

calderwoodra

I've spent many many hours trying these suggestions, didn't have much luck. iter_batches loads the whole file (or some very large amount of it) into memory.

11 days ago

semi-extrinsic

It sounds like maybe your parquet file has no partitioning. Apart from the iterating over row groups like someone else suggested, I suspect there is no better solution than downloading the whole thing to your computer, partitioning it in a sane way, and uploading it again. It's only 15 GB so it should be fine even on an old laptop.

Of course then you might as well do all the processing you're interested in while the file is on your local disk, since it is probably much faster than the cloud service disk.

11 days ago

okr

What do you mean by the parquet file might have no partitioning? Is the row group size not the implicit partitioning?

9 days ago

jfim

Spend a few bucks on an EC2 instance with a few terabytes of RAM for an hour or so. u-3tb1.56xlarge is about $27/hr.

11 days ago

nucleardog

This is the answer for a one-off or occasional problem unless your time is worthless.

$200 to rent a machine that can run the naive solution for an entire day is peanuts compared to the dev time for a “better” solution. Running that machine for eight hours would only cost enough to purchase about a half day of junior engineer time.

11 days ago

orf

Understand the format of your data.

Look at the parquet file metadata: use whatever tool you want for that. The Python parquet library is useful and supports s3.

How big are your row groups? If it’s one large row group then you will run into this issue.

What’s the number of rows in each row group?

11 days ago

bluedemon

Perhaps look into using dlt from https://dlthub.com, using pyarrow or polars. It handles large datasets well, especially when using generators to process the data in chunks.

11 days ago

xk3

You may want to give octosql a try. I was able to read 100GiB parquet files and aggregate them using less than 100MiB RAM and it is pretty fast too

10 days ago

katamaster818

Maybe check out alteryx designer cloud, airbyte, or estuary?

9 days ago

Hikikomori

Should be able to do it with Aws Athena?

11 days ago

Zababa

It's very interesting to see how a new "enterprise open source" project is born. The part where right at the start the author knows that they should have more than one company on board, and how the other companies each contribute their part.

10 days ago

anentropic

Following along the subsequent blog posts in this series progress to Arrow and then to... OpenLineage

I'm curious if anyone has experience with OpenLineage/Marquez or similar they'd like to share

11 days ago

jjgreen

Why is it not in Debian? Is there some deep and dark secret?

https://search.debian.org/cgi-bin/omega?DB=en&P=parquet

13 days ago

pabs3

No-one with the combination of motivation, time and skills needed get it into Debian. Someone wanted to get a Python implementation in, but it looks like they never found the time.

https://bugs.debian.org/838338

These days Debian packaging has become a bit irrelevant, since you can just shove upstream releases into a container and go for it.

11 days ago

mkesper

If you want to depend on curl|bash setups, sure. If you want to get security updates, good packaging is still required.

10 days ago

whinvik

Parquet is a file format. Should a file format be in Debian?

13 days ago

jjgreen

I mean library support for reading and writing it:

  > apt-cache search hdf5 | wc -l
  134
  > apt-cache search netcdf | wc -l
  70
  > apt-cache search parquet | wc -l 
  0
13 days ago

whinvik

Are there Arrow libraries. Feel like a lot of applications that read Parquet actually outsource raw reads to Arrow.

12 days ago

jjtheblunt

look for polars, https://pola.rs

11 days ago

jjtheblunt

Why did you ignore the link i gave you above?

If you follow that link, you'll see polars and parquet are a large highly configurable collection of tools for format manipulations across many HPC formats. Debian maintainers possibly don't want to bundle the entirety, as it would be vast.

Might this help you, though?

https://cloudsmith.io/~opencpn/repos/polar-prod/packages/det...

11 days ago

petre

11 days ago

jjgreen

My question is "why isn't it in Debian?", I ask that since Debian has rather high standards and the absence from Debian suggests some quality issue in available libraries for the format or the format itself.

Are there dark secrets?

11 days ago

jjtheblunt

Could the dark secret you seek be "debian isn't for bleeding edge packages"?

it's very modern and perhaps hasn't been around long enough to have debian maintainers feel it's vetted.

for instance, documentation for Python bindings is more advanced than for Rust bindings, but the package itself uses Rust at the low level.

11 days ago

jjgreen

Parquet is what, 12 years old? Hardly cutting edge. What you say my well be true for polars (I'm not familiar with it), if/when it (or something else) does get packaged I'll give parquet another look ...

11 days ago

petre

Pandas is probably in Debian and it can read parquet files. Polars is fairly new and under active development. It's a python library, I install those in $HOME/.local, as opposed to system wide. One can also install it in a venv. With pip you can also uninstall packages and keep things fairly tidy.

11 days ago

jjgreen

Pandas is in Debian but it cannot read parquet files itself, it uses 3rd party "engines" for that purpose and those are not available in Debian

  Python 3.10.12 (main, Nov 20 2023, 15:14:05) [GCC 11.4.0]
    on linux
  Type "help", "copyright", "credits" or "license" for more 
    information.
  >>> import pandas
  >>> pandas.read_parquet('sample3.parquet')
  Traceback (most recent call last):
    File "<stdin>", line 1, in <module>
    File "/usr/lib/python3/dist-packages/pandas/io/parquet.py", 
        line 493, in read_parquet
      impl = get_engine(engine)
    File "/usr/lib/python3/dist-packages/pandas/io/parquet.py", 
      line 53, in get_engine
    raise ImportError(
      ImportError: Unable to find a usable engine; tried using: 
      'pyarrow', 'fastparquet'.
    A suitable version of pyarrow or fastparquet is required for 
    parquet support.
11 days ago

petre

Then the options you are left with are either polars via pip or a third party parquet-tools Debian package.

https://github.com/hangxie/parquet-tools/blob/main/USAGE.md#...

11 days ago

jjtheblunt

polars via Rust (cargo) also

10 days ago

jjtheblunt

Yes, i wasn't clear: it's the polars library that's actively changing, so that might be the issue, or just the vast set of optional components configurable on installation, which isn't the normal package manager experience.

FWIW i think i share your general aversion to _not_ using packages, just for the tidiness of installs and removals, though i'm on fedora and macos.

11 days ago

marginalia_nu

Duckdb is probably what you want, though I don't think it's in debian either. It's in Arch though.

11 days ago

mistrial9

pandas is a python-centric, tabular data handler that works well in clouds (and desktop Debian). Pandas can read parquet data today, among other libs mentioned. The binary dot-so driver style is single-host centric and not the emphasis of these cloudy projects (and their cloudy funders)

https://pandas.pydata.org/docs/reference/api/pandas.read_par...

https://packages.debian.org/buster/python3-pandas

Perhaps more alarm is called for when this python+pandas and parquet does not work on Debian, but that is not the case today.

ps- data access in clouds often uses the S3:// endpoint . Contrast to a POSIX endpoint using _fread()_ or similar.. many parquet-aware clients prefer the cloudy, un-POSIX method to access data and that is another reason it is not a simple package in Debian today.

11 days ago

datadrivenangel

Pandas often has significant memory overhead, so it's not uncommon to need ~3-5x the amount of memory as your file size.

Polars and DuckDB are much better about memory management.

11 days ago

marton78

Also, Polars has a sane and well thought out API, unlike the hot streaming mess that is Pandas.

11 days ago

jjgreen

As I understand it, pandas can read parquet if the pyarrow or fastparquet packages are available, but that's not the case and attempts to fix that have been underway for several years.

https://bugs.debian.org/cgi-bin/bugreport.cgi?bug=970021

11 days ago

mistrial9

11 days ago

heroprotagonist

> most basic definition of open source is the code is available for you to read

What crap. That's 'source-available', NOT open-source.

But at least co-option of terminology is an indicator of success.

10 days ago