From 42a409cd59c7f86ea67c2b0976cf1abed0b15d5b Mon Sep 17 00:00:00 2001 From: Chloe He Date: Fri, 9 Feb 2024 01:01:15 -0800 Subject: [PATCH] docs(blog): add post on stream-batch unification --- .../index/execute-results/html.json | 16 + .../flink-unified-stream-batch/index.qmd | 388 ++++++++++++++++++ 2 files changed, 404 insertions(+) create mode 100644 docs/_freeze/posts/flink-unified-stream-batch/index/execute-results/html.json create mode 100644 docs/posts/flink-unified-stream-batch/index.qmd diff --git a/docs/_freeze/posts/flink-unified-stream-batch/index/execute-results/html.json b/docs/_freeze/posts/flink-unified-stream-batch/index/execute-results/html.json new file mode 100644 index 0000000000000..b74473d230def --- /dev/null +++ b/docs/_freeze/posts/flink-unified-stream-batch/index/execute-results/html.json @@ -0,0 +1,16 @@ +{ + "hash": "2a587956b0ac8b85fcd59afecdbb693a", + "result": { + "engine": "jupyter", + "markdown": "---\ntitle: \"Stream-batch unification through Ibis lens\"\nauthor: \"Chloe\"\ndate: 2024-02-08\ncategories:\n - blog\n - flink\n - streaming\n---\n\nOne of my focuses in the past 10 months has been to implement the Flink backend\nfor Ibis. I was working with Apache Flink and building a feature engineering\ntool, and we stumbled upon Ibis as we attempted to build our own translation\nlayer that could turn user declarations into relation trees, then optimize and\ndeploy the query plan, all while maintaining the underlying infrastructure for\nthe user. We considered and prototyped with a number of tools and eventually\nchose Ibis. It had already established a position in the batch world and had\nsupport for 10+ of the most popular batch engines (at the time). We loved the\nidea of decoupling the user-facing interface from the execution engine, so that\n users can swap out the execution engine depending on their needs, without\n having to rewrite code. And, of course, it was open-source. It was everything\n we dreamed of.\n\nA few months later, [we started introducing Apache Flink as the first streaming\nbackend into Ibis](https://github.com/ibis-project/ibis/pull/6408). We saw so\nmuch more that Ibis can do when it steps outside of batch.\n\nIbis 8.0 marks the official launch of the first streaming backends in Ibis\n(Apache Flink and RisingWave). This is a very significant milestone in Ibis\ndevelopment.\n\nYou may be wondering: what does this mean? Why is this such a big deal? I will\nbe answering these questions in this blog post.\n\n## Ibis combines stream and batch into a single framework beyond version 8.0\n\nToday, Ibis provides support for close to 20 backends, including Dask, DuckDB,\nPostgreSQL, PySpark, Snowflake, and others. However - before the introduction\nof Flink and RisingWave backends - all of the supported backends derive from a\nbatch paradigm (aside from Spark, which does offer support for stream\nprocessing, albeit using micro-batches underneath the hood).\n\nThis means that Ibis is an extremely valuable tool, but it was limited to batch\nworkloads. In the case of streaming workloads, where systems are [designed with\nunbounded data in mind](https://www.oreilly.com/radar/the-world-beyond-batch-streaming-101/),\nthe batch-oriented Ibis fell short. To deal with an infinite data stream,\nstreaming data systems operate with unique concepts such as “event time”,\n“processing time”, “watermark”, etc. All of these were missing from Ibis.\n\nAt the same time, streaming systems (Spark Streaming, Apache Flink, RisingWave,\netc) have been gaining use cases. It drove the development of more mature\ntechnologies as well as new approaches to close the gap between batch and\nstreaming worlds. [Flink SQL, for example, was born as a part of such effort\nand, through allowing users to write streaming engines in a SQL-like manner,\nhave been vastly successful in that regard.](https://www.ververica.com/blog/apache-flink-sql-past-present-and-future)\nThe success of Flink SQL both validates the potential of stream and batch\nunification and inspires the community to push for better standards, a vision\nthat Ibis is at a unique and valuable position to help build.\n\n## Why is batch-stream unification significant?\n\nFirstly, large companies that have both batch and streaming workloads often\ndeploy\n[Lambda architecture](https://en.wikipedia.org/wiki/Lambda_architecture).\nIn a Lambda infrastructure, batch and streaming pipelines are separate, which\nrequires two codebases to be set up and maintained. If you’re a platform\nengineer, you have probably found yourself trying to duplicate batch workloads\n“in streaming code” and vice versa. If you have backfilled a streaming pipeline\ndue to a bug and needed to reimplement the logic on a batch pipeline, you know\nhow painful that all is :(\n\n[LinkedIn successfully reduced processing time by 94% and resource utilization\nby 50% after switching from a Lambda architecture to unified batch and\nstreaming pipelines.](https://www.linkedin.com/blog/engineering/data-streaming-processing/unified-streaming-and-batch-pipelines-at-linkedin-reducing-proc)\nA unified system also massively increased engineer productivity because they no\nlonger needed to develop and maintain separate codebases for different\nenvironments.\n[Uber](https://www.uber.com/blog/kappa-architecture-data-stream-processing/),\n[Alibaba](https://www.ververica.com/blog/apache-flinks-stream-batch-unification-powers-alibabas-11.11-in-2020),\nand [Lyft](https://beam.apache.org/case-studies/lyft/) have also adopted\nsimilar solutions.\n\nSecondly, in the world of machine learning, it’s common for data scientists to\ndevelop locally and experiment with a sampled, batch dataset in Python. If the\nresults look promising, the features and models would then be deployed into\nproduction. Oftentimes, there is a code handover in this process, and a\ndedicated team of developers would be responsible for rewriting the logic for\nproduction, as a streaming workload.\n\nIn both cases, there is a huge amount of technical overhead. If there is a\nstreamlined architecture, using a unified API, much of this overhead can be\navoided. As a platform engineer, you no longer need to worry about maintaining\ntwo separate architectures and codebases. As a data scientist or a machine\nlearning engineer, you can write one single workload that can execute both on\nbatch and streaming backends. Wouldn’t that be amazing?\n\n## Ibis unifies batch and streaming\n\nEnter Ibis. Ibis unifies batch and streaming with a single API. It decouples\nthe dataframe API from backend execution, so that the logic for defining data\ntransformations is unaffected by implementation discrepancies across backend\nengines. There is also an ongoing effort to further increase interoperability\nacross different languages and systems via a standard query plan intermediate\nrepresentation (IR), using a library called\n[`Substrait`](https://substrait.io/).\n\nWhat does this actually look like? For example, Ibis allows users to define\nwindow aggregations using the [`over()`\nmethod](https://ibis-project.org/reference/expression-tables#ibis.expr.types.groupby.GroupedTable.over).\nWhen executed on the Flink backend, this translates into [Flink’s over\naggregation query](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/over-agg/)\nand outputs an aggregated value for every input row over a range of ordered\nrows. On streaming data, aggregation results are continuously computed and\nwritten into data sinks (e.g., Kafka, Redis) as records are received at and\nconsumed from the upstream data source (e.g., Kafka, Change Data Capture). In\npandas, the conceptual analog is [windowing\noperation](https://pandas.pydata.org/docs/user_guide/window.html). Results are\ncomputed by looking back the length of the window from the current observation,\nbut can be computed all at once because batch data is static.\n\nAnother great example is deduplication. In Flink SQL, this looks something like this:\n\n```sql\nSELECT [column_list]\nFROM (\n SELECT [column_list],\n ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]\n ORDER BY time_attr [asc|desc]) AS rownum\n FROM table_name)\nWHERE rownum = 1\n```\nIn a database like Postgres, this could be as simple as\n```sql\nSELECT DISTINCT t0.`string_col`, t0.`int_col`\nFROM functional_alltypes t0\n```\nAnd in pandas, you would use the method `drop_duplicates()`:\n```python\ndf.drop_duplicates()\n```\n\n::: {.callout-note}\nWe’re working on supporting deduplication via `distinct()` in Flink backend and\nthis feature should be available before the next major release!\n:::\n\nThese underlying discrepancies are abstracted in such a way that you, as an\nIbis user, will no longer find yourself struggling with bugs that are the\nresult of subtleties across different engines and dialects. Need to rewrite\nyour batch workload as a streaming one or vice versa? Rest assured, Ibis has\nyou covered!\n\n## See it in action\n\nNow, let’s walk through a code example together to see how simple this\nexperience is!\n\n::: {.callout-note}\nPrerequisites for running this example:\n\n* Docker Compose: This tutorial uses Docker Compose to manage an Apache Kafka\nenvironment (including sample data generation) and a Flink cluster (for remote\nexecution). You can [download and install Docker Compose from the official\nwebsite](https://docs.docker.com/compose/install/).\n* JDK 11 release: Flink requires Java 11.\n* Python 3.9 or 3.10.\n* Follow [the setup tutorial](https://ibis-project.org/tutorials/open-source-software/apache-flink/0_setup)\nto install the Flink backend for Ibis.\n* Clone the [example repository](https://github.com/ibis-project/ibis-flink-example).\n:::\n\nFirst, spin up the Docker containers by running `docker compose up kafka\ninit-kafka data-generator`. This will set up a mocked Kafka source that\ncontains records that look like the following:\n\n```json\n{\n \"createTime\": \"2023-09-20 22:19:02.224\",\n \"orderId\": 1695248388,\n \"payAmount\": 88694.71922270155,\n \"payPlatform\": 0,\n \"provinceId\": 6,\n}\n```\n\nThis is a streaming data source. Commonly, to experiment with the data, we\nwould extract a chunk of the data and load it in batch:\n\n\n::: {#2354ae88 .cell execution_count=2}\n``` {.python .cell-code}\nfrom kafka import KafkaConsumer\n\nconsumer = KafkaConsumer(\"payment_msg\", auto_offset_reset=\"earliest\")\nrows = []\nfor _, msg in zip(range(100), consumer):\n rows.append(msg)\n```\n:::\n\n\nThis is a tabular dataset and we can convert it into a pandas DataFrame:\n\n::: {#ee59c4af .cell execution_count=3}\n``` {.python .cell-code}\nimport json\n\nimport pandas as pd\n\ndf = pd.DataFrame([json.loads(row.value) for row in rows])\ndf[\"createTime\"] = pd.to_datetime(df[\"createTime\"])\ndf\n```\n\n::: {.cell-output .cell-output-display execution_count=3}\n```{=html}\n
\n\n\n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n
createTimeorderIdpayAmountpayPlatformprovinceId
02024-02-09 08:36:45.469170746780621810.19408105
12024-02-09 08:36:45.978170746780776957.21621404
22024-02-09 08:36:46.481170746780855564.86292604
32024-02-09 08:36:46.984170746780949171.35703901
42024-02-09 08:36:47.489170746781066983.38943802
..................
952024-02-09 08:37:33.281170746790188212.11440500
962024-02-09 08:37:33.784170746790290708.22001705
972024-02-09 08:37:34.286170746790354984.92848102
982024-02-09 08:37:34.78817074679041599.13481104
992024-02-09 08:37:35.28917074679053242.77904000
\n

100 rows × 5 columns

\n
\n```\n:::\n:::\n\n\nWe can connect to this DataFrame in ibis:\n\n::: {#5fac1658 .cell execution_count=4}\n``` {.python .cell-code}\nimport ibis\n\ncon = ibis.pandas.connect()\nt = con.create_table(\"payments\", df)\nt\n```\n\n::: {.cell-output .cell-output-display execution_count=4}\n```{=html}\n
DatabaseTable: payments\n  createTime  timestamp\n  orderId     int64\n  payAmount   float64\n  payPlatform int64\n  provinceId  int64\n
\n```\n:::\n:::\n\n\nThis is a series of records of order transactions. At Company Potclay, we have\njust deployed a new ad campaign, which is A/B tested by province, and we’re\ninterested in the effectiveness of this ad campaign by monitoring data\ndistribution shift over time. A crucial feature is the total transaction amount\nover the past minute, stratified by province. We would like to first experiment\nwriting this feature on a smaller set of batch data. After we make sure that\nthe logic looks correct and handles all edge cases appropriately, we want to\ndeploy this as a streaming workload.\n\nHere is how we would define this feature in Ibis code:\n\n::: {#7718d866 .cell execution_count=5}\n``` {.python .cell-code}\nagged = t[\n \"provinceId\",\n t.payAmount.sum()\n .over(range=(-ibis.interval(seconds=10), 0), order_by=t.createTime)\n .name(\"pay_amount\"),\n]\nagged\n```\n\n::: {.cell-output .cell-output-display execution_count=5}\n```{=html}\n
r0 := DatabaseTable: payments\n  createTime  timestamp\n  orderId     int64\n  payAmount   float64\n  payPlatform int64\n  provinceId  int64\n\nSelection[r0]\n  selections:\n    provinceId: r0.provinceId\n    pay_amount: WindowFunction(func=Sum(r0.payAmount), frame=RangeWindowFrame(table=r0, \nstart=WindowBoundary(value=10 s, preceding=True), end=WindowBoundary(Cast(0, to=interval('s'))), order_by=[asc \nr0.createTime]))\n
\n```\n:::\n:::\n\n\nCarrying out the computations using `pandas` backend is as simple as:\n\n::: {#5d756441 .cell execution_count=6}\n``` {.python .cell-code}\nagged.to_pandas()\n```\n\n::: {.cell-output .cell-output-display execution_count=6}\n```{=html}\n
\n\n\n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n
provinceIdpay_amount
052.181019e+04
149.876741e+04
241.543323e+05
312.035036e+05
422.704870e+05
.........
9501.045234e+06
9651.085570e+06
9721.105781e+06
9841.032541e+06
9901.035244e+06
\n

100 rows × 2 columns

\n
\n```\n:::\n:::\n\n\nFor local experimentation purposes, this DataFrame only consists of 100 rows,\nso doing this in memory is easy.\n\nThe outputs look correct and we didn’t run into any errors. We are now ready to\ndeploy this as a streaming job in Flink!\n\nFirst, let’s set up the Flink environment and connect to this Kafka source:\n\n::: {.callout-note}\nKafka connector is not part of the binary distribution, so we need to download\nand link it for cluster execution explicitly:\n\n\n:::\n\n::: {#0ffbce8e .cell execution_count=8}\n``` {.python .cell-code}\nimport ibis.expr.schema as sch\nimport ibis.expr.datatypes as dt\nfrom pyflink.table import EnvironmentSettings, TableEnvironment\nfrom pyflink.common import Configuration\n\nsource_schema = sch.Schema(\n {\n \"createTime\": dt.timestamp(scale=3),\n \"orderId\": dt.int64,\n \"payAmount\": dt.float64,\n \"payPlatform\": dt.int32,\n \"provinceId\": dt.int32,\n }\n)\n\nenv_settings = EnvironmentSettings.in_streaming_mode()\ntable_env = TableEnvironment.create(env_settings)\n\ntable_config = table_env.get_config()\nconfig = Configuration()\nconfig.set_string(\"parallelism.default\", \"1\")\ntable_config.add_configuration(config)\n\nconnection = ibis.flink.connect(table_env)\n\n# add the JAR downloaded above\nconnection.raw_sql(\"ADD JAR 'flink-sql-connector-kafka-1.17.1.jar'\")\n\nsource_configs = {\n \"connector\": \"kafka\",\n \"topic\": \"payment_msg\",\n \"properties.bootstrap.servers\": \"localhost:9092\",\n \"properties.group.id\": \"test_3\",\n \"scan.startup.mode\": \"earliest-offset\",\n \"format\": \"json\",\n}\n\nt = connection.create_table(\n \"payment_msg\",\n schema=source_schema,\n tbl_properties=source_configs,\n watermark=ibis.watermark(\n time_col=\"createTime\", allowed_delay=ibis.interval(seconds=15)\n ),\n)\n```\n:::\n\n\nHow would we write this in Flink SQL? Ibis makes this extremely easy by\nexposing a `compile()` API:\n\n::: {#b642748b .cell execution_count=9}\n``` {.python .cell-code}\nagged = t[\n \"provinceId\",\n t.payAmount.sum()\n .over(range=(-ibis.interval(seconds=10), 0), order_by=t.createTime)\n .name(\"pay_amount\"),\n]\nsql = connection.compile(agged)\nprint(sql)\n```\n\n::: {.cell-output .cell-output-stdout}\n```\nSELECT t0.`provinceId`,\n sum(t0.`payAmount`) OVER (ORDER BY t0.`createTime` ASC RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) AS `pay_amount`\nFROM payment_msg t0\n```\n:::\n:::\n\n\nBefore we can execute this query, we need to first define a data sink where the\nresults can be written:\n\n::: {#20cc49d8 .cell execution_count=10}\n``` {.python .cell-code}\nsink_schema = sch.Schema(\n {\n \"province_id\": dt.int32,\n \"pay_amount\": dt.float64,\n }\n)\n\nkafka_sink_configs = {\n \"connector\": \"kafka\",\n \"topic\": \"sink\",\n \"properties.bootstrap.servers\": \"localhost:9092\",\n \"format\": \"json\",\n}\n\nconnection.create_table(\n \"kafka_sink\", schema=sink_schema, tbl_properties=kafka_sink_configs\n)\n```\n\n::: {.cell-output .cell-output-display execution_count=10}\n```{=html}\n
DatabaseTable: kafka_sink\n  province_id int32\n  pay_amount  float64\n
\n```\n:::\n:::\n\n\nNow, let’s write the results into this sink:\n\n::: {#bd5cb22f .cell execution_count=11}\n``` {.python .cell-code}\nconnection.insert(\"kafka_sink\", agged)\n```\n\n::: {.cell-output .cell-output-display execution_count=11}\n```\n\n```\n:::\n:::\n\n\n::: {.callout-tip}\nYou can examine the results either using the Kafka console consumer CLI or the\n`kafka-python` library.\n:::\n\nHow easy was it to define both batch and streaming workloads using Ibis?\nWithout Ibis, you would have needed to write a pandas workload and then convert\nit into Flink SQL manually.\n\n## Concluding thoughts\n\nWith the introduction of the first streaming backends, Ibis is now both a batch\nand a streaming Python DataFrame API and we’re excited about what’s to come\nnext. We hope that Ibis can close the gap between batch and streaming in such a\nway that we no longer talk about the two separately, but, rather, as two parts\nof the same paradigm. Streaming naturally lends itself to batch: batch is\ntechnically just a special case of streaming, where the unbounded data flow\nstops at some point.\n\nOf course, this is only the beginning. There are still technical challenges to\nbe solved (e.g., backfill, window computations over large windows, GPU\nacceleration), and we hope we'll have more exciting updates to share with the\ncommunity soon!\n\nCheck out the new Apache Flink and RisingWave backends and let us know what you\nthink!\n\n", + "supporting": [ + "index_files" + ], + "filters": [], + "includes": { + "include-in-header": [ + "\n\n\n" + ] + } + } +} \ No newline at end of file diff --git a/docs/posts/flink-unified-stream-batch/index.qmd b/docs/posts/flink-unified-stream-batch/index.qmd new file mode 100644 index 0000000000000..d111f46065d76 --- /dev/null +++ b/docs/posts/flink-unified-stream-batch/index.qmd @@ -0,0 +1,388 @@ +--- +title: "Stream-batch unification through Ibis lens" +author: "Chloe" +date: 2024-02-08 +categories: + - blog + - flink + - streaming +--- + +One of my focuses in the past 10 months has been to implement the Flink backend +for Ibis. I was working with Apache Flink and building a feature engineering +tool, and we stumbled upon Ibis as we attempted to build our own translation +layer that could turn user declarations into relation trees, then optimize and +deploy the query plan, all while maintaining the underlying infrastructure for +the user. We considered and prototyped with a number of tools and eventually +chose Ibis. It had already established a position in the batch world and had +support for 10+ of the most popular batch engines (at the time). We loved the +idea of decoupling the user-facing interface from the execution engine, so that + users can swap out the execution engine depending on their needs, without + having to rewrite code. And, of course, it was open-source. It was everything + we dreamed of. + +A few months later, [we started introducing Apache Flink as the first streaming +backend into Ibis](https://github.com/ibis-project/ibis/pull/6408). We saw so +much more that Ibis can do when it steps outside of batch. + +Ibis 8.0 marks the official launch of the first streaming backends in Ibis +(Apache Flink and RisingWave). This is a very significant milestone in Ibis +development. + +You may be wondering: what does this mean? Why is this such a big deal? I will +be answering these questions in this blog post. + +## Ibis combines stream and batch into a single framework beyond version 8.0 + +Today, Ibis provides support for close to 20 backends, including Dask, DuckDB, +PostgreSQL, PySpark, Snowflake, and others. However - before the introduction +of Flink and RisingWave backends - all of the supported backends derive from a +batch paradigm (aside from Spark, which does offer support for stream +processing, albeit using micro-batches underneath the hood). + +This means that Ibis is an extremely valuable tool, but it was limited to batch +workloads. In the case of streaming workloads, where systems are [designed with +unbounded data in mind](https://www.oreilly.com/radar/the-world-beyond-batch-streaming-101/), +the batch-oriented Ibis fell short. To deal with an infinite data stream, +streaming data systems operate with unique concepts such as “event time”, +“processing time”, “watermark”, etc. All of these were missing from Ibis. + +At the same time, streaming systems (Spark Streaming, Apache Flink, RisingWave, +etc) have been gaining use cases. It drove the development of more mature +technologies as well as new approaches to close the gap between batch and +streaming worlds. [Flink SQL, for example, was born as a part of such effort +and, through allowing users to write streaming engines in a SQL-like manner, +have been vastly successful in that regard.](https://www.ververica.com/blog/apache-flink-sql-past-present-and-future) +The success of Flink SQL both validates the potential of stream and batch +unification and inspires the community to push for better standards, a vision +that Ibis is at a unique and valuable position to help build. + +## Why is batch-stream unification significant? + +Firstly, large companies that have both batch and streaming workloads often +deploy +[Lambda architecture](https://en.wikipedia.org/wiki/Lambda_architecture). +In a Lambda infrastructure, batch and streaming pipelines are separate, which +requires two codebases to be set up and maintained. If you’re a platform +engineer, you have probably found yourself trying to duplicate batch workloads +“in streaming code” and vice versa. If you have backfilled a streaming pipeline +due to a bug and needed to reimplement the logic on a batch pipeline, you know +how painful that all is :( + +[LinkedIn successfully reduced processing time by 94% and resource utilization +by 50% after switching from a Lambda architecture to unified batch and +streaming pipelines.](https://www.linkedin.com/blog/engineering/data-streaming-processing/unified-streaming-and-batch-pipelines-at-linkedin-reducing-proc) +A unified system also massively increased engineer productivity because they no +longer needed to develop and maintain separate codebases for different +environments. +[Uber](https://www.uber.com/blog/kappa-architecture-data-stream-processing/), +[Alibaba](https://www.ververica.com/blog/apache-flinks-stream-batch-unification-powers-alibabas-11.11-in-2020), +and [Lyft](https://beam.apache.org/case-studies/lyft/) have also adopted +similar solutions. + +Secondly, in the world of machine learning, it’s common for data scientists to +develop locally and experiment with a sampled, batch dataset in Python. If the +results look promising, the features and models would then be deployed into +production. Oftentimes, there is a code handover in this process, and a +dedicated team of developers would be responsible for rewriting the logic for +production, as a streaming workload. + +In both cases, there is a huge amount of technical overhead. If there is a +streamlined architecture, using a unified API, much of this overhead can be +avoided. As a platform engineer, you no longer need to worry about maintaining +two separate architectures and codebases. As a data scientist or a machine +learning engineer, you can write one single workload that can execute both on +batch and streaming backends. Wouldn’t that be amazing? + +## Ibis unifies batch and streaming + +Enter Ibis. Ibis unifies batch and streaming with a single API. It decouples +the dataframe API from backend execution, so that the logic for defining data +transformations is unaffected by implementation discrepancies across backend +engines. There is also an ongoing effort to further increase interoperability +across different languages and systems via a standard query plan intermediate +representation (IR), using a library called +[`Substrait`](https://substrait.io/). + +What does this actually look like? For example, Ibis allows users to define +window aggregations using the [`over()` +method](https://ibis-project.org/reference/expression-tables#ibis.expr.types.groupby.GroupedTable.over). +When executed on the Flink backend, this translates into [Flink’s over +aggregation query](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/over-agg/) +and outputs an aggregated value for every input row over a range of ordered +rows. On streaming data, aggregation results are continuously computed and +written into data sinks (e.g., Kafka, Redis) as records are received at and +consumed from the upstream data source (e.g., Kafka, Change Data Capture). In +pandas, the conceptual analog is [windowing +operation](https://pandas.pydata.org/docs/user_guide/window.html). Results are +computed by looking back the length of the window from the current observation, +but can be computed all at once because batch data is static. + +Another great example is deduplication. In Flink SQL, this looks something like this: + +```sql +SELECT [column_list] +FROM ( + SELECT [column_list], + ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]] + ORDER BY time_attr [asc|desc]) AS rownum + FROM table_name) +WHERE rownum = 1 +``` +In a database like Postgres, this could be as simple as +```sql +SELECT DISTINCT t0.`string_col`, t0.`int_col` +FROM functional_alltypes t0 +``` +And in pandas, you would use the method `drop_duplicates()`: +```python +df.drop_duplicates() +``` + +::: {.callout-note} +We’re working on supporting deduplication via `distinct()` in Flink backend and +this feature should be available before the next major release! +::: + +These underlying discrepancies are abstracted in such a way that you, as an +Ibis user, will no longer find yourself struggling with bugs that are the +result of subtleties across different engines and dialects. Need to rewrite +your batch workload as a streaming one or vice versa? Rest assured, Ibis has +you covered! + +## See it in action + +Now, let’s walk through a code example together to see how simple this +experience is! + +::: {.callout-note} +Prerequisites for running this example: + +* Docker Compose: This tutorial uses Docker Compose to manage an Apache Kafka +environment (including sample data generation) and a Flink cluster (for remote +execution). You can [download and install Docker Compose from the official +website](https://docs.docker.com/compose/install/). +* JDK 11 release: Flink requires Java 11. +* Python 3.9 or 3.10. +* Follow [the setup tutorial](https://ibis-project.org/tutorials/open-source-software/apache-flink/0_setup) +to install the Flink backend for Ibis. +* Clone the [example repository](https://github.com/ibis-project/ibis-flink-example). +::: + +First, spin up the Docker containers by running `docker compose up kafka +init-kafka data-generator`. This will set up a mocked Kafka source that +contains records that look like the following: + +```json +{ + "createTime": "2023-09-20 22:19:02.224", + "orderId": 1695248388, + "payAmount": 88694.71922270155, + "payPlatform": 0, + "provinceId": 6, +} +``` + +This is a streaming data source. Commonly, to experiment with the data, we +would extract a chunk of the data and load it in batch: +```{python} +# | code-fold: true +# | include: false +!pip install apache-flink +!git clone https://github.com/ibis-project/ibis-flink-example +!cd ibis-flink-example && docker compose up kafka init-kafka data-generator -d && sleep 10 && cd .. +``` + +```{python} +from kafka import KafkaConsumer + +consumer = KafkaConsumer("payment_msg", auto_offset_reset="earliest") +rows = [] +for _, msg in zip(range(100), consumer): + rows.append(msg) +``` + +This is a tabular dataset and we can convert it into a pandas DataFrame: + +```{python} +import json + +import pandas as pd + +df = pd.DataFrame([json.loads(row.value) for row in rows]) +df["createTime"] = pd.to_datetime(df["createTime"]) +df +``` + +We can connect to this DataFrame in ibis: + +```{python} +import ibis + +con = ibis.pandas.connect() +t = con.create_table("payments", df) +t +``` + +This is a series of records of order transactions. At Company Potclay, we have +just deployed a new ad campaign, which is A/B tested by province, and we’re +interested in the effectiveness of this ad campaign by monitoring data +distribution shift over time. A crucial feature is the total transaction amount +over the past minute, stratified by province. We would like to first experiment +writing this feature on a smaller set of batch data. After we make sure that +the logic looks correct and handles all edge cases appropriately, we want to +deploy this as a streaming workload. + +Here is how we would define this feature in Ibis code: + +```{python} +agged = t[ + "provinceId", + t.payAmount.sum() + .over(range=(-ibis.interval(seconds=10), 0), order_by=t.createTime) + .name("pay_amount"), +] +agged +``` + +Carrying out the computations using `pandas` backend is as simple as: + +```{python} +agged.to_pandas() +``` + +For local experimentation purposes, this DataFrame only consists of 100 rows, +so doing this in memory is easy. + +The outputs look correct and we didn’t run into any errors. We are now ready to +deploy this as a streaming job in Flink! + +First, let’s set up the Flink environment and connect to this Kafka source: + +::: {.callout-note} +Kafka connector is not part of the binary distribution, so we need to download +and link it for cluster execution explicitly: +```{python} +# | code-fold: true +# | include: false +!wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.17.1/flink-sql-connector-kafka-1.17.1.jar +``` +::: + +```{python} +import ibis.expr.schema as sch +import ibis.expr.datatypes as dt +from pyflink.table import EnvironmentSettings, TableEnvironment +from pyflink.common import Configuration + +source_schema = sch.Schema( + { + "createTime": dt.timestamp(scale=3), + "orderId": dt.int64, + "payAmount": dt.float64, + "payPlatform": dt.int32, + "provinceId": dt.int32, + } +) + +env_settings = EnvironmentSettings.in_streaming_mode() +table_env = TableEnvironment.create(env_settings) + +table_config = table_env.get_config() +config = Configuration() +config.set_string("parallelism.default", "1") +table_config.add_configuration(config) + +connection = ibis.flink.connect(table_env) + +# add the JAR downloaded above +connection.raw_sql("ADD JAR 'flink-sql-connector-kafka-1.17.1.jar'") + +source_configs = { + "connector": "kafka", + "topic": "payment_msg", + "properties.bootstrap.servers": "localhost:9092", + "properties.group.id": "test_3", + "scan.startup.mode": "earliest-offset", + "format": "json", +} + +t = connection.create_table( + "payment_msg", + schema=source_schema, + tbl_properties=source_configs, + watermark=ibis.watermark( + time_col="createTime", allowed_delay=ibis.interval(seconds=15) + ), +) +``` + +How would we write this in Flink SQL? Ibis makes this extremely easy by +exposing a `compile()` API: + +```{python} +agged = t[ + "provinceId", + t.payAmount.sum() + .over(range=(-ibis.interval(seconds=10), 0), order_by=t.createTime) + .name("pay_amount"), +] +sql = connection.compile(agged) +print(sql) +``` + +Before we can execute this query, we need to first define a data sink where the +results can be written: + +```{python} +sink_schema = sch.Schema( + { + "province_id": dt.int32, + "pay_amount": dt.float64, + } +) + +kafka_sink_configs = { + "connector": "kafka", + "topic": "sink", + "properties.bootstrap.servers": "localhost:9092", + "format": "json", +} + +connection.create_table( + "kafka_sink", schema=sink_schema, tbl_properties=kafka_sink_configs +) +``` + +Now, let’s write the results into this sink: + +```{python} +connection.insert("kafka_sink", agged) +``` + +::: {.callout-tip} +You can examine the results either using the Kafka console consumer CLI or the +`kafka-python` library. +::: + +How easy was it to define both batch and streaming workloads using Ibis? +Without Ibis, you would have needed to write a pandas workload and then convert +it into Flink SQL manually. + +## Concluding thoughts + +With the introduction of the first streaming backends, Ibis is now both a batch +and a streaming Python DataFrame API and we’re excited about what’s to come +next. We hope that Ibis can close the gap between batch and streaming in such a +way that we no longer talk about the two separately, but, rather, as two parts +of the same paradigm. Streaming naturally lends itself to batch: batch is +technically just a special case of streaming, where the unbounded data flow +stops at some point. + +Of course, this is only the beginning. There are still technical challenges to +be solved (e.g., backfill, window computations over large windows, GPU +acceleration), and we hope we'll have more exciting updates to share with the +community soon! + +Check out the new Apache Flink and RisingWave backends and let us know what you +think!