This project contains a couple of tools to analyze data around the Apache Flink community, including
- the commit history of the Apache Flink Open Source project,
- the pull requests to its repository on Github,
- and messages on the user and developer mailing lists
which also contain created Jira ticket information (on the
flink-dev
mailing list).
While there are a couple of sub-projects, the overall analytics basically splits up into two categories:
- The original Flink Repository Analytics (via
DataStream
API), and - SQL analytics on any of the available data above.
This Apache Flink application analyzes the commit history of the Apache Flink Open Source project to determine each components' activity over time. It pulls data live from the Github API via a custom Flink Source and writes it to ElasticSearch.
It is the running example for a blog post series on Ververica Platform Community Edition.
The entrypoint class is com.ververica.platform.FlinkCommitProgram
in the commit-analytics
sub-project.
This application accepts the following command line arguments of the form --values key
:
Parameter | Description | Default |
---|---|---|
es-host | Elastic Search Host | elasticsearch-master-headless.vvp.svc |
es-port | Elastic Search Port | 9200 |
start-date | The application will process the commit history of Apache Flink starting from this date. Examples: 2019-01 , 2019-01-01 , 2020-01-01T10:30Z , 2020-01-01T10:30:00Z , 2020-01-01T10:30:00.000Z |
now |
enable-new-feature | A feature flag for enabling a new Job feature | unset |
poll-interval-ms | Minimum pause between polling commits via the Github API (in milliseconds) | 1000 |
checkpointing-interval-ms | Apache Flink checkpointing interval (in milliseconds) | 10000 |
This functionality is split into 2 projects:
import
to copy Flink Repository information, Mailing Lists contents, and Pull Requests information into Kafkasql-functions
to offer a few helpers useful for a variety of SQL queries
The import
sub-project contains three jobs to import data from various public sources around the
development of Apache Flink:
com.ververica.platform.FlinkCommitsToKafka
com.ververica.platform.FlinkMailingListToKafka
- This will import the following Apache Flink mailing lists:
flink-dev
,flink-user
,flink-user-zh
- This will import the following Apache Flink mailing lists:
com.ververica.platform.FlinkPullRequestsToKafka
These jobs leverage source implementations in the DataStream
API and use the Table API to write the created
elements to Kafka.
Each application accepts the following command line arguments of the form --values key
:
Parameter | Description | Default |
---|---|---|
kafka-server | Kafka bootstrap server | kafka.vvp.svc |
kafka-topic | Kafka topic to write to (FlinkMailingListToKafka will use this as the prefix) |
`flink-commits |
start-date | The application will process its input starting from this date. Examples: 2019-01 , 2019-01-01 , 2020-01-01T10:30Z , 2020-01-01T10:30:00Z , 2020-01-01T10:30:00.000Z |
now |
poll-interval-ms | Minimum pause between polling input data after reaching the current date and time (in milliseconds) | 10000 |
checkpointing-interval-ms | Apache Flink checkpointing interval (in milliseconds) | 10000 |
The Kafka tables that the import
sub-project is writing to can directly be used in
SQL queries with the following table definitions that you just need to point to your Kafka server(s),
adjusting other properties as needed, e.g. Kafka topic names or watermark definitions:
CREATE TABLE `flink_commits` (
`author` STRING,
`authorDate` TIMESTAMP(3),
`authorEmail` STRING,
`commitDate` TIMESTAMP(3),
`committer` STRING,
`committerEmail` STRING,
`filesChanged` ARRAY<ROW<filename STRING, linesAdded INT, linesChanged INT, linesRemoved INT>>,
`sha1` STRING,
`shortInfo` STRING,
WATERMARK FOR `commitDate` AS `commitDate` - INTERVAL '1' DAY
)
COMMENT 'Commits on the master branch of github.com/apache/flink'
WITH (
'connector' = 'kafka',
'topic' = 'flink-commits',
'properties.bootstrap.servers' = '<kafka-server>',
'properties.group.id' = 'flink-analytics',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);
CREATE TABLE `flink_pulls` (
`closedAt` TIMESTAMP(3),
`commentsCount` INT,
`createdAt` TIMESTAMP(3),
`creator` STRING,
`creatorEmail` STRING,
`description` STRING,
`labels` ARRAY<STRING>,
`mergeCommit` STRING,
`mergedAt` TIMESTAMP(3),
`number` INT,
`state` STRING,
`title` STRING,
`updatedAt` TIMESTAMP(3),
WATERMARK FOR `createdAt` AS `createdAt` - INTERVAL '7' DAY
)
COMMENT 'Pull requests opened for the master branch of github.com/apache/flink'
WITH (
'connector' = 'kafka',
'topic' = 'flink-pulls',
'properties.bootstrap.servers' = '<kafka-server>',
'properties.group.id' = 'flink-analytics',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);
CREATE TABLE `flink_ml_dev` (
`date` TIMESTAMP(3),
`fromEmail` STRING,
`fromRaw` STRING,
`htmlBody` STRING,
`subject` STRING,
`textBody` STRING,
WATERMARK FOR `date` AS `date` - INTERVAL '1' DAY
)
COMMENT 'Email summary of all messages sent to [email protected]>'
WITH (
'connector' = 'kafka',
'topic' = 'flink-mail-dev',
'properties.bootstrap.servers' = '<kafka-server>',
'properties.group.id' = 'flink-analytics',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);
CREATE TABLE `flink_ml_user` (
`date` TIMESTAMP(3),
`fromEmail` STRING,
`fromRaw` STRING,
`htmlBody` STRING,
`subject` STRING,
`textBody` STRING,
WATERMARK FOR `date` AS `date` - INTERVAL '1' DAY
)
COMMENT 'Email summary of all messages sent to [email protected]>'
WITH (
'connector' = 'kafka',
'topic' = 'flink-mail-user',
'properties.bootstrap.servers' = '<kafka-server>',
'properties.group.id' = 'flink-analytics',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);
CREATE TABLE `flink_ml_user_zh` (
`date` TIMESTAMP(3),
`fromEmail` STRING,
`fromRaw` STRING,
`htmlBody` STRING,
`subject` STRING,
`textBody` STRING,
WATERMARK FOR `date` AS `date` - INTERVAL '1' DAY
)
COMMENT 'Email summary of all messages sent to [email protected]>'
WITH (
'connector' = 'kafka',
'topic' = 'flink-mail-user-zh',
'properties.bootstrap.servers' = '<kafka-server>',
'properties.group.id' = 'flink-analytics',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);
The sql-functions
sub-project contains a few user-defined functions that
you can use to simplify your SQL queries when analyzing the repository.
You can find them in the com.ververica.platform.sql.functions
package
The following SQL statements are just examples of things you can look for with the data that is available. There is much more you can find out. A few more examples were presented in a Flink Forward Global 2020 talk on A Year in Flink - Flink SQL Live Coding.
SELECT
TUMBLE_END(`date`, INTERVAL '365' DAY(3)) as windowEnd,
COUNT(DISTINCT fromEmail) AS numUsers
FROM flink_ml_user
GROUP BY TUMBLE(`date`, INTERVAL '365' DAY(3));
SELECT
SESSION_END(`date`, INTERVAL '30' DAY) AS windowEnd,
NormalizeEmailThread(subject) AS thread,
COUNT(*) as numMessagesInThread
FROM flink_ml_user
WHERE `date` > (CURRENT_TIMESTAMP - INTERVAL '1' YEAR)
GROUP BY SESSION(`date`, INTERVAL '30' DAY), NormalizeEmailThread(subject)
HAVING COUNT(*) < 2;
This is basically the SQL version of the Flink Repository Analytics with the DataStream
API introduced above.
SELECT
TUMBLE_END(commitDate, INTERVAL '30' DAY) AS windowEnd,
GetSourceComponent(filename),
SUM(linesChanged) AS linesChanged
FROM flink_commits CROSS JOIN UNNEST(filesChanged) AS t
WHERE commitDate > (CURRENT_TIMESTAMP - INTERVAL '1' YEAR)
GROUP BY TUMBLE(commitDate, INTERVAL '30' DAY), GetSourceComponent(filename)
HAVING SUM(linesChanged) > 1000;
SELECT
TUMBLE_END(`date`, INTERVAL '30' DAY) as windowEnd,
component,
COUNT(*) as createdTickets
FROM flink_ml_dev
CROSS JOIN UNNEST(GetJiraTicketComponents(textBody)) AS c (component)
WHERE `date` > (CURRENT_TIMESTAMP - INTERVAL '1' YEAR)
AND IsJiraTicket(fromRaw)
AND GetJiraTicketComponents(textBody) IS NOT NULL
GROUP BY TUMBLE(`date`, INTERVAL '30' DAY), component
HAVING COUNT(*) > 10;
A couple of other sub-projects mainly serve as helper utilities providing common functionality:
common
offers generic helper classes and all entities used throughout the whole project.source-github
contains implementations of sources interacting with the Github API.source-mbox
contains a source implementation that uses Apache James Mime4J to parse mbox archives.