Hi! In this video we are going to learn
how to set up a Pinot cluster. Apache Pinot is a real-time OLAP data store that
can provide ultra-low latency even at high throughput. It can ingest data from
batch data sources such as Hadoop, S3, Azure, Google Cloud Storage, or from streaming data sources such as
Kafka, EventHub or Kinesis. Originally built at LinkedIn,
Pinot can power a variety of analytical applications such as
real-time - dashboarding applications like Superset anomaly detection applications like ThirdEye,
rich interactive user-facing analytics data products such as
Company Analytics, Who Viewed My Profile, Uber Eats restaurant analytics, and many more. Pinot is also used at companies such as
Uber, Microsoft, Weibo, Factual, Slack In this tutorial we will set
up a Pinot cluster with the following components 1 zookeeper, 2 controllers,
2 brokers, 2 servers. Once the cluster is up and running, we will see how to
load data into Pinot and query it. At the end, we will show how Pinot is resilient
to failures. Before we get started make sure to go over this list of
prerequisites and download the necessary software. Download the sample data and
configs from this Github repo. The readme in this repository has all the
commands that we will use in this tutorial. Download the latest Pinot release from pinot.apache.org Make sure you have Java 9 or higher,
and Apache Maven 3.4 or higher installed. To browse the zookeeper instance will need
Zooinspector from this Github repo. And lastly download Apache Kafka binary 2.4 or higher. I have created a directory pinot-tutorial.
In this directory I have, the pinot-tutorial repository,
the latest Pinot release 0.3.0 untarred, zooinspector installed and built as per
the instructions in the ReadMe and Apache Kafka 2.4.1 downloaded and untarred. Let's get started! Now that we have everything we need
let's start bringing up the Pinot components one by one. The first component is Zookeeper.
Pinot uses Apache zookeeper as a persistent metadata store. We will start zookeeper using the pinot-admin script which can be found in the apache-pinot download,
and the command bin/pinot-admin.sh StartZookeeper
and we will provide the ZK port. Let's use 2181.
This starts zookeeper on the local host and port 2181. Let's connect to the zookeeper instance we started
using Zooinspector There's nothing in the Zookeeper right now.
Let's start the rest of our components, and we'll see them appear in
zooinspector one by one. The next component is the Pinot controller Pinot uses Apache Helix for cluster management. The Pino controller hosts Apache Helix
and together they are responsible for managing all the other components of the cluster
To start a controller will use the pinot-admin script, along with the command
StartController. We'll provide the ZK address of the zookeeper instance
we just started which is localhost 2181. We will also give a name to our cluster,
let's call it PinotCluster. And we'll define the port on which we want our controller to start
Let's use 9001. Let's go look at Zooinspector to see what's changed. Look at that, we have a new cluster called Pinot cluster.
Let's look inside the cluster. We have a configs folder.
Within that is a cluster folder, which has some cluster-level config properties.
We have a participants folder, which lists all the participants of the cluster.
So far we only have the controller that we just started. Let's add another controller to this cluster. This time let's use port 9002. Now let's get back to the zooinspector.
We can see our second controller under participants. In the controller directory
we can see a leader node which tells us which of the two controllers is the lead controller
The lead controller has additional responsibilities such as
running some periodic maintenance and cleanup tasks in the background.
Alright let's see what more can we do with our controller. Head over to a browser
window and type localhost:9001 There's two options here -
Query Console and REST API The Query Console lets us run queries on
the tables in our cluster. As you can see,
there's nothing there as of now. So let's come back to this later.
The Rest API has admin endpoints to operate and manage the cluster. Here you can perform read/write/delete
operations on other entities of a cluster such as the tables schemas and segments. We look at all these in detail later on in this video. Now let's start the next component -
the broker. Brokers are the components that accept queries,
forward them to the data servers, receive the results from the servers,
merge and send the results back. To start a broker, we are going to use the launcher script
with the command StartBroker. We'll provide the address of our ZK instance,
and the cluster name PinotCluster, and the port we'd like to start it on.
Let's use 7001. Let's also start another broker on port 7002. Let's go look for our brokers in zooinspector.
We have our brokers here under participants, one on port 7001 and the other on 7002. Next we add servers.
Servers are responsible for hosting data segments and serving queries off them.
To start a server will use the launcher script, along with the command, StartServer.
Again we provide the ZK address, the cluster name, a server port,
let's use 8001, And additionally a server admin port,
let's use 8011. Let's also start another server on port 8002 and admin port 8012. We can see our servers under
PARTICIPANTS, started on port 8001 & 8002 respectively. Ok so our cluster is up and running.
Quick recap, we started 1 zookeeper instance, 2 controller nodes, 2
broken nodes and 2 server nodes. Now, it's time to get some data into the cluster.
For that, let's talk about the logical components of a Pinot cluster -
the tenants, tables, schemas and segments. Let's start with tenants.
A tenant is simply a logical grouping of nodes, formed by giving the nodes the same Helix tag.
In our cluster, we have a default tenant created, called "DefaultTenant".
When nodes are created in the cluster, they automatically get added to the DefaultTenant.
Let's look at the broker nodes. We see they have a tag called "DefaultTenant_BROKER".
Let's look at the servers. They have a tag called "DefaultTenant_OFFLINE"
and "DefaultTenant_REALTIME". These tags determine which tenants the
nodes belong to. We look at this more
closely when we create a table. Next, we will look at schema, table and segment.
Let's head over to the examples we downloaded, from the pinot-tutorial repo.
There's some sample data in raw data/transcript.csv It has columns like
studentID, firstName, lastName, subject, score and timestamp. To get this data into Pinot,
we need a Pinot schema and a Pinot table. Let's look at the schema file -
transcript-schema.json. The schema categorizes columns into dimensions,
metrics and time. In our sample transcript schema, we have dimensions
studentID, firstName, lastName & subject. We have the metric score and we
have a time column timestamp. We also define data types of the columns here. Next, let's look at the table config file -
transcript-table-offline.json. A table refers to a collection of data,
consisting of rows and columns. This concept is similar to other databases.
The table config file has the table name, the table type - which in this
file is OFFLINE and some other information, such as schema name, the time column name and replication.
Look at the tenant section. This is where we define which tenant
this table will belong to. This means that this table's data and queries only
use the brokers and server tagged "DefaultTenant". This helps us achieve
isolation between various tables if necessary. No need to create multiple
clusters for every use case. Now it's time to upload the schema and table.
I'm going to save the path to these configs, into a variable BASE_DIR Then we'll use the launcher script and
the command AddTable. We provide the table config file path,
the schema file and the controller information,
and run this command. Great, now we have this table and schema
created in the cluster. Let's go to the admin UI.
We see our table listed under Tables and the config listed here. And also the scheme listed under
Schemas. So where's this being stored?
These configs are stored in the property store. In zookeeper, the table config
can be found under CONFIGS, schema can be found under SCHEMAS.
Also take a look at the Ideal State. We have a new entry for the transcript_OFFLINE table.
It's empty at the moment, as there are no segments. There's a similar entry under
External View, which is also empty. Ideal State represents the desired state of
the table resource, and the External View represents the actual state.
Any cluster actions usually update the Ideal State and then Helix sends state transitions
to the right components. They update the External View after they are done
handling the state transition. Ok, now let's add some data to this table.
Data in the Pinot table is stored as segments, which is essentially small chunks of the data.
This is similar to shards or partitions of a relational database.
Each segment packs the data in a columnar fashion, along with the dictionaries and indices
for the columns. To make a segment and upload it to the cluster we
first need an ingestion job spec file. Take a look at batch-job-spec.yaml.
It contains configs such as input directory and output directory.
Make sure these are correctly set up as per your directory structure. It also has data format and controller details.
Let's run this command using launch ingestion job and provide the job
spec file you okay our segment should have made it to
the table we can see it in the property store look at the metadata we can see
the segment push time the start and end time of the data in the segment also the
number of documents or records in the segment we can also see it in the ideal
state and the external view the segment has two replicas based on the
replication that we set in the table config one replica is assigned to each
server it's time to go explore the data we just uploaded head over to the query
console here we can run sequel like queries on the data such as selections
aggregations aggregation group bys and much more a full list of queries
supported can be found in the documentation on the Apache Pina website
so how will the query is going from console to the right broker look at the
broker resource in zoo inspector the broker keeps a mapping of the table name
to the broker instances internally the broker also computes routing tables
which is a mapping between segments and servers and that's how the broker knows
how to pick the servers to answer the given query okay so we got some static
data into pino using a batch job what about reading from a stream let's
bring up Kafka and create a sample stream to consume from using the peano
admin launcher scripts and the command start Kafka we can set up a local Kafka
cluster so the command is Pino admin dot Sh start Kafka let's use the same
zookeeper instance that we have set up for Pino so that's localhost colon - 1 8
1 slash Kafka and let's provide a port for the Kafka broker 9 8 7 6 ok so this
started Kafka for us we can create a topic in the SCAF
cluster using the Kafka binary that we downloaded using Kafka topics dot SH - -
create and bootstrap server is the broker that we created in Kafka
localhost nine eight seven six replication factor one let's create two
partitions for the topic and give the topic the name transcript - topic okay
let's go to zoo inspector and check out the Kafka instance on zoo inspector we
see the topic we just created under topics now we'll create a real time
table look at transcript real time table dot JSON in the peano tutorial example
data note that this looks similar to the previous table some of the differences
are the table type which is real time and we also have a stream config section
which defines conflicts related to the Kafka stream and to create this table we
will use the admin script with the command add table and provide the table
config file and the controller port 9 0 0 1 similar to before we can see this
table in property store in the external view and in the ideal state there's two
segments one per partition of the khakha topic there's two replicas based on the
replication we defined in the table config the segment state says consuming
that's because the real-time segments will be in memory segments consuming
from the stream and periodically get flushed onto the disk that's when they
become online we can query the segment even when it's consuming let's publish
some data into the stream we have a sample JSON data file in raw data /
transcript or JSON as soon as we push data into the stream it will be read by
the consuming segment indexed and be ready to query to publish
data into the caca topic let's use the Kafka script Kafka console producer dot
SH - - broker lists localhost nine eight seven six - - topic transcript topic and
provide the transcript JSON file let's go look at the data in the query console
we have real-time data now Pinot follows the lambda architecture seamlessly
merging between the offline and the real-time data streams you can see the
offline and real-time data separately by appending underscore offline or
underscore real-time to the query the time column plays an important role in
such hybrid tables the overlap between offline and real-time data is noted in
the broker as the time boundary Pinot will use the offline data as much as
possible and real-time data only for the latest events which are not present in
the offline okay so far we've set up a Pinot cluster with zookeeper controllers
brokers servers we created tenants tables schemas we pushed batch data and
also streaming data let's talk a little bit about fault tolerance earlier in the
video we created two controllers and saw that one of them was designated as the
leader now what happens if the lead controller goes down let's find out I'm
going to stop the lead controller process and now let's check out the
leader node on so inspector the other controller has taken over leadership the
cluster is still up and running and the ingestion is still going on now what if
we lose one of the brokers let's stop one of the broker nodes the broker
resource node is updated to report just one broker for the stable
our queries are unaffected let's do the same experiment with servers I'm
stopping one of the server processes the ideal state will keep this entry intact
because the slower is still part of the participants and still under default
tenant but the external view has been updated to reflect only one active
server that's how the broker knows the server's it should query now when the
server comes back up it will rejoin the cluster and the external view will be
restored to its initial State that's it for this video don't forget to check out
the pinot dot apache dot org website for community resources and documentation
follow us on twitter at Apache pino for latest announcements meetups and updates
thanks