- Confluent Kafka Version
- Install Confluent Kafka
- Confluent Kafka Wikipedia
- Confluent Kafka Documentation
- Confluent Kafka Tutorial
Docker images for Kafka. This repo provides build files for Kafka Docker images. Properties are inherited from a top-level POM. Properties may be overridden on the command line (-Ddocker.registry=testing.example.com:8080/), or in a subproject's POM. Install Apache Kafka on Mac. Download the latest Apache Kafka from under Binary downloads. Click on any of the binary downloads, or choose a specific scala version if you have any dependency with scala in your development. Go with the recommended mirror site. Extract the contents.
Confluent Platform includes the Java producer and consumer shipped with Apache Kafka®.
![Confluent Confluent](/uploads/1/2/7/7/127749892/392958669.png)
Java Client installation¶
All JARs included in the packages are also available in the Confluent Mavenrepository. Here’s a sample POM file showing how to add this repository:
The Confluent Maven repository includes compiled versions of Kafka.
To reference the Kafka version 2.6 that is included with Confluent Platform 6.0.0,use the following in your
pom.xml
:Note
Version names of Apache Kafka vs. Kafka in Confluent Platform:Confluent always contributes patches back to the Apache Kafka® open source project.However, the exact versions (and version names) being included in Confluent Platformmay differ from the Apache artifacts when Confluent Platform and Kafkareleases do not align. If they are different, Confluent keeps the
groupId
and artifactId
identical, but appends the suffix -ccs
to the version identifierof the Confluent Platform version to distinguish these from the Apache artifacts.You can reference artifacts for all Java libraries that are included with Confluent Platform. For example, to use theAvro serializer you can include the following in your
pom.xml
:Tip
You can also specify
kafka-protobuf-serializer
or kafka-jsonschema-serializer
serializers. For more information, see Schema Formats, Serializers, and Deserializers.Confluent Kafka Version
Java Client example code¶
For Hello World examples of Kafka clients in Java, see Java.All examples include a producer and consumer that can connect to any Kafka cluster running on-premises or in Confluent Cloud.They also include examples of how to produce and consume Avro data with Schema Registry.
Kafka Producer¶
Initialization¶
The Java producer is constructed with a standard
Properties
file.Configuration errors will result in a raised
KafkaException
fromthe constructor of KafkaProducer
.Asynchronous writes¶
The Java producer includes a
send()
API which returns a future which can be polled to get the result of the send.This producer example shows how to invoke some code after the write has completed you can alsoprovide a callback. In Java this is implemented as a
Callback
object:In the Java implementation you should avoid doing any expensive work inthis callback since it is executed in the producer’s IO thread.
Install Confluent Kafka
Synchronous writes¶
Kafka Consumer¶
Initialization¶
The Java consumer is constructed with a standard
Properties
file.Configuration errors will result in a
KafkaException
raised fromthe constructor of KafkaConsumer
.Basic usage¶
The Java client is designed around an event loop which is driven bythe
poll()
API. This design is motivated by the UNIX select
and poll
system calls. A basic consumption loop with the Java APIusually takes the following form:There is no background thread in the Java consumer. The API depends oncalls to
poll()
to drive all of its IO including:- Joining the consumer group and handling partition rebalances.
- Sending periodic heartbeats if part of an active generation.
- Sending periodic offset commits (if autocommit is enabled).
- Sending and receiving fetch requests for assigned partitions.
Due to this single-threaded model, no heartbeats can be sent whilethe application is handling the records returned from a call to
poll()
.This means that the consumer will fall out of the consumer group if either the event loopterminates or if a delay in record processing causes the sessiontimeout to expire before the next iteration of the loop. This isactually by design. One of the problems that the Java client attemptsto solve is ensuring the liveness of consumers in the group. As longas the consumer is assigned partitions, no other members in the groupcan consume from the same partitions, so it is important to ensurethat it is actually making progress and has not become a zombie.This feature protects your application from a large class of failures,but the downside is that it puts the burden on you to tune the sessiontimeout so that the consumer does not exceed it in its normal recordprocessing. The
max.poll.records
configuration option places an upper bound on the number ofrecords returned from each call. You should use both poll()
and max.poll.records
with a fairly highsession timeout (e.g. 30 to 60 seconds), and keeping the number ofrecords processed on each iteration bounded so that worst-casebehavior is predictable.If you fail to tune these settings appropriately, the consequence istypically a
CommitFailedException
raised from the call to commitoffsets for the processed records. If you are using the automaticcommit policy, then you might not even notice when this happens sincethe consumer silently ignores commit failures internally (unless it’soccurring often enough to impact lag metrics). You can catch thisexception and either ignore it or perform any needed rollback logic.Java Client code examples¶
Basic poll loop¶
The consumer API is centered around the
poll()
method, which isused to retrieve records from the brokers. The subscribe()
methodcontrols which topics will be fetched in poll. Typically, consumerusage involves an initial call to subscribe()
to setup the topicsof interest and then a loop which calls poll()
until theapplication is shut down.The consumer intentionally avoids a specific threading model. It isnot safe for multi-threaded access and it has no background threads ofits own. In particular, this means that all IO occurs in the threadcalling
poll()
. In the consumer example below, the poll loop is wrapped in aRunnable
which makes it easy to use with an ExecutorService
.The poll timeout is hard-coded to 500 milliseconds. If no recordsare received before this timeout expires, then
poll()
will returnan empty record set. It’s not a bad idea to add a shortcut check forthis case if your message processing involves any setup overhead.To shut down the consumer, a flag is added which is checked on eachloop iteration. After shutdown is triggered, the consumer will wait atmost 500 milliseconds (plus the message processing time) beforeshutting down since it might be triggered while it is in
poll()
.A better approach is provided in the next example.Note that you should always call
close()
after you are finishedusing the consumer. Doing so will ensure that active sockets areclosed and internal state is cleaned up. It will also trigger a grouprebalance immediately which ensures that any partitions owned by theconsumer are re-assigned to another member in the group. If not closedproperly, the broker will trigger the rebalance only after the sessiontimeout has expired. Latch is added to this example to ensurethat the consumer has time to finish closing before finishingshutdown.Shutdown with wakeup¶
An alternative pattern for the poll loop in the Java consumer is touse
Long.MAX_VALUE
for the timeout. To break from the loop, you canuse the consumer’s wakeup()
method from a separate thread. Thiswill raise a WakeupException
from the thread blocking inpoll()
. If the thread is not currently blocking, then this willwakeup the next poll invocation.Synchronous commits¶
Confluent Kafka Wikipedia
The simplest and mostreliable way to manually commit offsets is using a synchronous commitwith
commitSync()
. As its name suggests, this method blocks untilthe commit has completed successfully.In this example, a try/catch block is added around the call to
commitSync
. The CommitFailedException
is thrown when thecommit cannot be completed because the group has been rebalanced. Thisis the main thing to be careful of when using the Javaclient. Since all network IO (including heartbeating) and messageprocessing is done in the foreground, it is possible for the sessiontimeout to expire while a batch of messages is being processed. Tohandle this, you have two choices.First you can adjust the
session.timeout.ms
setting to ensure thatthe handler has enough time to finish processing messages. You canthen tune max.partition.fetch.bytes
to limit the amount of datareturned in a single batch, though you will have to consider how manypartitions are in the subscribed topics.The second option is to do message processing in a separate thread,but you will have to manage flow control to ensure that the threadscan keep up. For example, just pushing messages into a blocking queuewould probably not be sufficient unless the rate of processing cankeep up with the rate of delivery (in which case you might not need aseparate thread anway). It may even exacerbate the problem if the pollloop is stuck blocking on a call to
offer()
while the backgroundthread is handling an even larger batch of messages. The Java APIoffers a pause()
method to help in these situations.For now, you should set
session.timeout.ms
large enough thatcommit failures from rebalances are rare. As mentioned above, the onlydrawback to this is a longer delay before partitions can bere-assigned in the event of a hard failure (where the consumer cannotbe cleanly shut down with close()
). This should be rare inpractice.You should be careful in this example sincethe
wakeup()
might be triggered while the commit is pending. Therecursive call is safe since the wakeup will only be triggered once.Delivery guarantees¶
In the previous example, you get “at least once”delivery sincethe commit follows the message processing. By changing the order,however, you can get “at most once” delivery. But you must be alittle careful with the commit failure, so you should change
doCommitSync
to return whether or not the commit succeeded. There’s also no longerany need to catch the WakeupException
in the synchronous commit.Correct offset management is crucial because it affects deliverysemantics.
Asynchronous commits¶
The API gives you a callback which is invokedwhen the commit either succeeds or fails:
In the example below, synchronous commits are incorporated on rebalances and on close.For this, the
subscribe()
method has a variant which accepts aConsumerRebalanceListener
, which has two methods to hook intorebalance behavior.Confluent Kafka Documentation
Suggested Reading¶
Blog post: Multi-Threaded Message Consumption with the Apache Kafka Consumer
Confluent Cloud bills are based on your consumption of resources within your cloud organization.Discounts based on usage are available with annual commitments.
Billing for each Confluent Cloud component accrues at hourly intervals. To view billinginformation, log into Confluent Cloud and visit the Billing & paymentscreen.
Confluent Cloud billing integrates withGoogle Cloud Platform,Microsoft Azure,and Amazon Web Services.Sign up with your cloud provider to pay via your cloud provider billing account.
Annual commitments¶
Confluent Cloud offers the ability to make a commitment to a minimum amount of spend. Thiscommitment gives you access to discounts and provides the flexibility to use thiscommitment across the entire Confluent Cloud stack, including any Kafka cluster type,ksqlDB on Confluent Cloud, Connectors,and Support. With self-serve provisioning and expansion,you have the freedom to consume only what you need from a commitment at any pointin time.
With annual commitments you can view the total amount of accrued usage during thecommitment term and the amount of time left on your commitment.
If you use more than your committed amount, you can continue using Confluent Cloud withoutinterruption. You will be charged at your discounted rate for usage beyond the committedamount. Charges will go to the payment method set for your organization.
Contact Confluent to learn more about annualcommitments.
Billing profile¶
To view your billing profile, navigate to the Billing & payment screen and selectthe Payment details & contacts tab.
You can obtain your Cloud Organization ID, edit your billing information, add anaddress for tax purposes, or claim a Promo Code.
You can use all major credit cards with Confluent Cloud. To switch to invoicing,contact the Confluent sales team.
Credit card payment receipts or invoices are emailed to the address that was initiallyprovided during sign up for Confluent Cloud. To change the billing email address, contactsupport or your account teamThe invoice can only be sent to a single email address.
Billing Dimensions¶
Kafka¶
Confluent Cloud offers basic, standard, and dedicated Kafkacluster types. Kafka clusters are billed based on these dimensions.
Dimension | Unit of measure |
---|---|
Ingress | Cost per GB |
Egress | Cost per GB |
Storage | Cost per GB |
Partitions | Cost per partition per hour (Effective July 1, 2020) |
Dimension | Unit of measure |
---|---|
Base price | Cost per hour |
Ingress | Cost per GB |
Egress | Cost per GB |
Storage | Cost per GB per hour |
Partitions | Cost per partition per hour (Effective July 1, 2020) |
Dimension | Unit of measure |
---|---|
CKU price | Cost per CKU per hour |
Ingress | Cost per GB |
Egress | Cost per GB |
Storage | Cost per GB per hour |
Confluent Cloud charges for partitions on Basic and Standard clusters. You are chargedfor the number of unique partitions that exist on your cluster during a given hour.Basic clusters receive 10 partitions free of charge, and Standard clusters receive500 partitions free of charge. Dedicated clusters have no partition-based charges.
Unit prices for each configuration are shown in the New cluster screen:
Note
You are billed for the total amount of data transferred in and out ofyour cluster, including request overhead associated with the Kafka protocol.
Kafka Connect¶
If you use a dedicated Kafka cluster, connectors on Confluent Cloud run on your owndedicated Connect cluster. Confluent Cloud provisions the dedicated Connectcluster when you launch your first connecter. If you use a Basic or Standard Kafkacluster, your connector runs in multi-tenant Kafka Connect clusters. For theconnectors running on a multi-tenant Connect cluster, there is no Connectcapacity (
ConnectCapacity
) charge.The dedicated cluster and fully-managed connectors are billed based on thefollowing dimensions:
Dimension | Unit of measure |
---|---|
Dedicated cluster | Cost per hour |
Task base price | Cost per task per hour |
Throughput | Cost per GB |
Fully-managed connector pricing is displayed in the Add Connector screen:
Dedicated connect cluster hourly pricing is shown on the Review and launchscreen the first time you launch a connector.
Dedicated cluster billing is shown as a line item on the Billing & payment >Billing screen.
Self-managed connectors have no billing mechanism themselves. However, note thatusing self-managed connectors may incur ingress, egress, and storage charges foryour Kafka clusters running in Confluent Cloud.
ksqlDB¶
Confluent Kafka Tutorial
Fully-managed ksqlDB is billed based on the following dimension:
Dimension | Unit of measure |
---|---|
Confluent Streaming Unit | Cost per Confluent Streaming Unit per hour |
In addition to the per-Confluent Streaming Unit charge, ksqlDB applications may influenceKafka ingress, egress, and storage.
Fully-managed ksqlDB pricing is displayed in the Add an Application screen:
Support¶
Support plans are available for purchase fromthe Confluent Cloud UI at https://confluent.cloud/settings/paid_support. If you have anannual commitment, support charges contribute to your total committed spend amount.
Certain downgrade restrictions apply to support plan purchases. Your current supportlevel will stay in effect until the end of the current calendar month. However,if you downgrade within the month of purchase, the current level is maintained untilthe end of the next full calendar month.
View invoices¶
You can view your current and past monthly invoices by navigating to the Billingtab on the Billing & payment screen. To view invoices from past months, selectan option from the dropdown.
Promo codes¶
If you have a promotional code for Confluent Cloud, you have these options to claim thecode:
- Navigate to the Payment details & contacts tab on the Billing & paymentpage and click + Promo code.
- On the New cluster screen, after you click Continue, enter the promotionalcode before you click Launch.