How to setup a Pinot cluster

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
Hi! In this video we are going to learn how to set up a Pinot cluster. Apache Pinot is a real-time OLAP data store that can provide ultra-low latency even at high throughput. It can ingest data from batch data sources such as Hadoop, S3, Azure, Google Cloud Storage, or from streaming data sources such as Kafka, EventHub or Kinesis. Originally built at LinkedIn, Pinot can power a variety of analytical applications such as real-time - dashboarding applications like Superset anomaly detection applications like ThirdEye, rich interactive user-facing analytics data products such as Company Analytics, Who Viewed My Profile, Uber Eats restaurant analytics, and many more. Pinot is also used at companies such as Uber, Microsoft, Weibo, Factual, Slack In this tutorial we will set up a Pinot cluster with the following components 1 zookeeper, 2 controllers, 2 brokers, 2 servers. Once the cluster is up and running, we will see how to load data into Pinot and query it. At the end, we will show how Pinot is resilient to failures. Before we get started make sure to go over this list of prerequisites and download the necessary software. Download the sample data and configs from this Github repo. The readme in this repository has all the commands that we will use in this tutorial. Download the latest Pinot release from pinot.apache.org Make sure you have Java 9 or higher, and Apache Maven 3.4 or higher installed. To browse the zookeeper instance will need Zooinspector from this Github repo. And lastly download Apache Kafka binary 2.4 or higher. I have created a directory pinot-tutorial. In this directory I have, the pinot-tutorial repository, the latest Pinot release 0.3.0 untarred, zooinspector installed and built as per the instructions in the ReadMe and Apache Kafka 2.4.1 downloaded and untarred. Let's get started! Now that we have everything we need let's start bringing up the Pinot components one by one. The first component is Zookeeper. Pinot uses Apache zookeeper as a persistent metadata store. We will start zookeeper using the pinot-admin script which can be found in the apache-pinot download, and the command bin/pinot-admin.sh StartZookeeper and we will provide the ZK port. Let's use 2181. This starts zookeeper on the local host and port 2181. Let's connect to the zookeeper instance we started using Zooinspector There's nothing in the Zookeeper right now. Let's start the rest of our components, and we'll see them appear in zooinspector one by one. The next component is the Pinot controller Pinot uses Apache Helix for cluster management. The Pino controller hosts Apache Helix and together they are responsible for managing all the other components of the cluster To start a controller will use the pinot-admin script, along with the command StartController. We'll provide the ZK address of the zookeeper instance we just started which is localhost 2181. We will also give a name to our cluster, let's call it PinotCluster. And we'll define the port on which we want our controller to start Let's use 9001. Let's go look at Zooinspector to see what's changed. Look at that, we have a new cluster called Pinot cluster. Let's look inside the cluster. We have a configs folder. Within that is a cluster folder, which has some cluster-level config properties. We have a participants folder, which lists all the participants of the cluster. So far we only have the controller that we just started. Let's add another controller to this cluster. This time let's use port 9002. Now let's get back to the zooinspector. We can see our second controller under participants. In the controller directory we can see a leader node which tells us which of the two controllers is the lead controller The lead controller has additional responsibilities such as running some periodic maintenance and cleanup tasks in the background. Alright let's see what more can we do with our controller. Head over to a browser window and type localhost:9001 There's two options here - Query Console and REST API The Query Console lets us run queries on the tables in our cluster. As you can see, there's nothing there as of now. So let's come back to this later. The Rest API has admin endpoints to operate and manage the cluster. Here you can perform read/write/delete operations on other entities of a cluster such as the tables schemas and segments. We look at all these in detail later on in this video. Now let's start the next component - the broker. Brokers are the components that accept queries, forward them to the data servers, receive the results from the servers, merge and send the results back. To start a broker, we are going to use the launcher script with the command StartBroker. We'll provide the address of our ZK instance, and the cluster name PinotCluster, and the port we'd like to start it on. Let's use 7001. Let's also start another broker on port 7002. Let's go look for our brokers in zooinspector. We have our brokers here under participants, one on port 7001 and the other on 7002. Next we add servers. Servers are responsible for hosting data segments and serving queries off them. To start a server will use the launcher script, along with the command, StartServer. Again we provide the ZK address, the cluster name, a server port, let's use 8001, And additionally a server admin port, let's use 8011. Let's also start another server on port 8002 and admin port 8012. We can see our servers under PARTICIPANTS, started on port 8001 & 8002 respectively. Ok so our cluster is up and running. Quick recap, we started 1 zookeeper instance, 2 controller nodes, 2 broken nodes and 2 server nodes. Now, it's time to get some data into the cluster. For that, let's talk about the logical components of a Pinot cluster - the tenants, tables, schemas and segments. Let's start with tenants. A tenant is simply a logical grouping of nodes, formed by giving the nodes the same Helix tag. In our cluster, we have a default tenant created, called "DefaultTenant". When nodes are created in the cluster, they automatically get added to the DefaultTenant. Let's look at the broker nodes. We see they have a tag called "DefaultTenant_BROKER". Let's look at the servers. They have a tag called "DefaultTenant_OFFLINE" and "DefaultTenant_REALTIME". These tags determine which tenants the nodes belong to. We look at this more closely when we create a table. Next, we will look at schema, table and segment. Let's head over to the examples we downloaded, from the pinot-tutorial repo. There's some sample data in raw data/transcript.csv It has columns like studentID, firstName, lastName, subject, score and timestamp. To get this data into Pinot, we need a Pinot schema and a Pinot table. Let's look at the schema file - transcript-schema.json. The schema categorizes columns into dimensions, metrics and time. In our sample transcript schema, we have dimensions studentID, firstName, lastName & subject. We have the metric score and we have a time column timestamp. We also define data types of the columns here. Next, let's look at the table config file - transcript-table-offline.json. A table refers to a collection of data, consisting of rows and columns. This concept is similar to other databases. The table config file has the table name, the table type - which in this file is OFFLINE and some other information, such as schema name, the time column name and replication. Look at the tenant section. This is where we define which tenant this table will belong to. This means that this table's data and queries only use the brokers and server tagged "DefaultTenant". This helps us achieve isolation between various tables if necessary. No need to create multiple clusters for every use case. Now it's time to upload the schema and table. I'm going to save the path to these configs, into a variable BASE_DIR Then we'll use the launcher script and the command AddTable. We provide the table config file path, the schema file and the controller information, and run this command. Great, now we have this table and schema created in the cluster. Let's go to the admin UI. We see our table listed under Tables and the config listed here. And also the scheme listed under Schemas. So where's this being stored? These configs are stored in the property store. In zookeeper, the table config can be found under CONFIGS, schema can be found under SCHEMAS. Also take a look at the Ideal State. We have a new entry for the transcript_OFFLINE table. It's empty at the moment, as there are no segments. There's a similar entry under External View, which is also empty. Ideal State represents the desired state of the table resource, and the External View represents the actual state. Any cluster actions usually update the Ideal State and then Helix sends state transitions to the right components. They update the External View after they are done handling the state transition. Ok, now let's add some data to this table. Data in the Pinot table is stored as segments, which is essentially small chunks of the data. This is similar to shards or partitions of a relational database. Each segment packs the data in a columnar fashion, along with the dictionaries and indices for the columns. To make a segment and upload it to the cluster we first need an ingestion job spec file. Take a look at batch-job-spec.yaml. It contains configs such as input directory and output directory. Make sure these are correctly set up as per your directory structure. It also has data format and controller details. Let's run this command using launch ingestion job and provide the job spec file you okay our segment should have made it to the table we can see it in the property store look at the metadata we can see the segment push time the start and end time of the data in the segment also the number of documents or records in the segment we can also see it in the ideal state and the external view the segment has two replicas based on the replication that we set in the table config one replica is assigned to each server it's time to go explore the data we just uploaded head over to the query console here we can run sequel like queries on the data such as selections aggregations aggregation group bys and much more a full list of queries supported can be found in the documentation on the Apache Pina website so how will the query is going from console to the right broker look at the broker resource in zoo inspector the broker keeps a mapping of the table name to the broker instances internally the broker also computes routing tables which is a mapping between segments and servers and that's how the broker knows how to pick the servers to answer the given query okay so we got some static data into pino using a batch job what about reading from a stream let's bring up Kafka and create a sample stream to consume from using the peano admin launcher scripts and the command start Kafka we can set up a local Kafka cluster so the command is Pino admin dot Sh start Kafka let's use the same zookeeper instance that we have set up for Pino so that's localhost colon - 1 8 1 slash Kafka and let's provide a port for the Kafka broker 9 8 7 6 ok so this started Kafka for us we can create a topic in the SCAF cluster using the Kafka binary that we downloaded using Kafka topics dot SH - - create and bootstrap server is the broker that we created in Kafka localhost nine eight seven six replication factor one let's create two partitions for the topic and give the topic the name transcript - topic okay let's go to zoo inspector and check out the Kafka instance on zoo inspector we see the topic we just created under topics now we'll create a real time table look at transcript real time table dot JSON in the peano tutorial example data note that this looks similar to the previous table some of the differences are the table type which is real time and we also have a stream config section which defines conflicts related to the Kafka stream and to create this table we will use the admin script with the command add table and provide the table config file and the controller port 9 0 0 1 similar to before we can see this table in property store in the external view and in the ideal state there's two segments one per partition of the khakha topic there's two replicas based on the replication we defined in the table config the segment state says consuming that's because the real-time segments will be in memory segments consuming from the stream and periodically get flushed onto the disk that's when they become online we can query the segment even when it's consuming let's publish some data into the stream we have a sample JSON data file in raw data / transcript or JSON as soon as we push data into the stream it will be read by the consuming segment indexed and be ready to query to publish data into the caca topic let's use the Kafka script Kafka console producer dot SH - - broker lists localhost nine eight seven six - - topic transcript topic and provide the transcript JSON file let's go look at the data in the query console we have real-time data now Pinot follows the lambda architecture seamlessly merging between the offline and the real-time data streams you can see the offline and real-time data separately by appending underscore offline or underscore real-time to the query the time column plays an important role in such hybrid tables the overlap between offline and real-time data is noted in the broker as the time boundary Pinot will use the offline data as much as possible and real-time data only for the latest events which are not present in the offline okay so far we've set up a Pinot cluster with zookeeper controllers brokers servers we created tenants tables schemas we pushed batch data and also streaming data let's talk a little bit about fault tolerance earlier in the video we created two controllers and saw that one of them was designated as the leader now what happens if the lead controller goes down let's find out I'm going to stop the lead controller process and now let's check out the leader node on so inspector the other controller has taken over leadership the cluster is still up and running and the ingestion is still going on now what if we lose one of the brokers let's stop one of the broker nodes the broker resource node is updated to report just one broker for the stable our queries are unaffected let's do the same experiment with servers I'm stopping one of the server processes the ideal state will keep this entry intact because the slower is still part of the participants and still under default tenant but the external view has been updated to reflect only one active server that's how the broker knows the server's it should query now when the server comes back up it will rejoin the cluster and the external view will be restored to its initial State that's it for this video don't forget to check out the pinot dot apache dot org website for community resources and documentation follow us on twitter at Apache pino for latest announcements meetups and updates thanks
Info
Channel: StarTree
Views: 2,572
Rating: 4.9069767 out of 5
Keywords: distributed systems, apache, apache pinot, realtime analytics, presto, apache druid, druid, OLAP, realtime database, video tutorial
Id: cNnwMF0pOJ8
Channel Id: undefined
Length: 21min 59sec (1319 seconds)
Published: Wed Mar 25 2020
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.