From Apache Kafka to Amazon S3.

Step 1: Start all the services—ZooKeeper, Kafka and the Connect worker

Begin by starting ZooKeeper, Kafka and Connect:
confluent start connect

Step 2: Start ingesting feed data to Kafka
Create a Kafka topic with five partitions to a single Kafka broker running locally:
bin/kafka-topics --zookeeper localhost:2181 --create --topic meetups --replication-factor 1 --partitions 5
Then, to ingest the events from the public Meetup feed, use a simple piped command that combines curl and the Confluent CLI.
curl -s http://stream.meetup.com/2/rsvps | confluent produce meetups
Ingest Events
I can confirm that data is actually written to Kafka as follows:
confluent consume meetups --from-beginning

Step 3: Set up credentials and create the S3 bucket

On the cloud side, first create an S3 bucket with the appropriate permissions. For instance, here I’ve created a bucket that is private and accessible by me, the user that will run the S3 connector. After setting permissions, I just need to pick a name and a region for my S3 bucket.
S3 Bucket
For the S3 connector to authenticate successfully when it contacts S3, I need to setup my AWS credentials. An easy way to do that, is to export two environment variables:
export AWS_ACCESS_KEY_ID=foo
export AWS_SECRET_ACCESS_KEY=bar
Make sure you replace foo and bar with your actual AWS credentials. Using environment variables to pass credentials to the connector is not the only way to authenticate. Under the covers, the S3 connector uses the default credentials provider included with the AWS SDK (DefaultAWSCredentialsProviderChain), and this makes several mainstream options for authenticating available to users.
For those who need to customize authentication even further, the S3 connector accepts a provider class as a configuration property that, in turn, can be configured with additional properties with the s3.credentials.provider. prefix. For a complete list of options, read more in the S3 connector documentation.

Step 4: Configure and start the S3 sink connector

Configure the S3 connector by inserting its properties in JSON format, and store them in a file called meetups-to-s3.json:
{
"name": "meetups-to-s3",
"config": {
"_comment": "The S3 sink connector class",
"connector.class":"io.confluent.connect.s3.S3SinkConnector",
"_comment": "The total number of Connect tasks to spawn (with implicit upper limit the number of topic-partitions)",
"tasks.max":"1",
"_comment": "Which topics to export to S3",
"topics":"meetups",
"_comment": "The S3 bucket that will be used by this connector instance",
"s3.bucket.name":"meetups",
"_comment": "The AWS region where the S3 bucket is located",
"s3.region":"us-west-2",
"_comment": "The size in bytes of a single part in a multipart upload. The last part is of s3.part.size bytes or less. This property does not affect the total size of an S3 object uploaded by the S3 connector",
"s3.part.size":"5242880",
"_comment": "The maximum number of Kafka records contained in a single S3 object. Here a high value to allow for time-based partition to take precedence",
"flush.size":"100000",
"_comment": "Kafka Connect converter used to deserialize keys (unused in this example)",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"false",
"_comment": "Kafka Connect converter used to deserialize values",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"false",
"_comment": "The type of storage for this storage cloud connector",
"storage.class":"io.confluent.connect.s3.storage.S3Storage",
"_comment": "The storage format of the objects uploaded to S3",
"format.class":"io.confluent.connect.s3.format.json.JsonFormat",
"_comment": "Schema compatibility mode between records with schemas (Useful when used with schema-based converters. Unused in this example, listed for completeness)",
"schema.compatibility":"NONE",
"_comment": "The class used to partition records in objects to S3. Here, partitioning based on time is used.",
"partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"_comment": "The locale used by the time-based partitioner to encode the date string",
"locale":"en",
"_comment": "Setting the timezone of the timestamps is also required by the time-based partitioner",
"timezone":"UTC",
"_comment": "The date-based part of the S3 object key",
"path.format":"'date'=YYYY-MM-dd/'hour'=HH",
"_comment": "The duration that aligns with the path format defined above",
"partition.duration.ms":"3600000",
"_comment": "The interval between timestamps that is sufficient to upload a new object to S3. Here a small interval of 1min for better visualization during the demo",
"rotate.interval.ms":"60000",
"_comment": "The class to use to derive the timestamp for each record. Here Kafka record timestamps are used",
"timestamp.extractor":"Record"
}
}
The S3 connector can partition records in S3 in several ways. With the above properties, I have chosen to run the S3 connector using time-based partitioning and therefore group Kafka records in objects to S3 according to a timestamp. The timestamps I use here are timestamps of the Kafka records themselves but you can also specify a field in the payload itself as the timestamp value to use.
Grouping records in S3 objects by their Kafka timestamps is convenient, because the timestamps of Kafka records are ordered and monotonically increasing within a partition in Kafka. The fact that I rotate an object every minute (rotate.interval.ms) means that every new file that appears in S3 will contain records that were published in Kafka within a single minute.
Now that I’m only one REST API call away from starting the S3 connector, I’ll issue the call using the Confluent CLI:
confluent load meetups-to-s3 -d ./meetups-to-s3.json
The equivalent command using curl directly would be:
curl -X POST -d @meetups-to-s3.json http://localhost:8083/connectors | jq
By inspecting the logs of the Connect worker, I’ve confirmed that the connector has started correctly:
INFO Starting connectors and tasks using config offset 9 
INFO Starting connector meetups-to-s3 
INFO Starting task meetups-to-s3-0 
INFO Creating task meetups-to-s3-0
Soon after, still at the INFO log level, you’ll see that the S3 connector first catches up with the messages that have been published already in Kafka and then uploads an object approximately every minute, according to the timestamps of the Kafka records:
INFO Files committed to S3. Target commit offset for meetups-0 is 12478
INFO Files committed to S3. Target commit offset for meetups-0 is 12613
INFO Files committed to S3. Target commit offset for meetups-0 is 12759
INFO Cluster ID: s5EbMmZQRdaDh-vARJ_WpQ
INFO Files committed to S3. Target commit offset for meetups-0 is 12863
INFO WorkerSinkTask{id=meetups-to-s3-0} Committing offsets asynchronously using sequence number 1: {meetups-0=OffsetAndMetadata{offset=12863, metadata=''}}
INFO Files committed to S3. Target commit offset for meetups-0 is 13035
INFO WorkerSinkTask{id=meetups-to-s3-0} Committing offsets asynchronously using sequence number 2: {meetups-0=OffsetAndMetadata{offset=13035, metadata=''}}
INFO Files committed to S3. Target commit offset for meetups-0 is 13205
INFO WorkerSinkTask{id=meetups-to-s3-0} Committing offsets asynchronously using sequence number 3: {meetups-0=OffsetAndMetadata{offset=13205, metadata=''}}
INFO Files committed to S3. Target commit offset for meetups-0 is 13339
INFO Cluster ID: s5EbMmZQRdaDh-vARJ_WpQ
INFO WorkerSinkTask{id=meetups-to-s3-0} Committing offsets asynchronously using sequence number 4: {meetups-0=OffsetAndMetadata{offset=13339, metadata=''}}
The view on the AWS S3 console also confirms the real-time upload of Meetup events from Kafka to objects in S3, with an object being created approximately every minute, as intended by our configuration.
AWS S3 Console
But how is this possible on top of an eventually consistent object store? Read on to see how we approached the challenge of avoiding duplicates in the S3 connector.

How we implemented exactly once streaming on eventually consistent S3

If you play around a bit with the pipeline we defined above, for example by restarting the S3 connector a few times, you will notice a couple of things: No duplicates appear in your bucket, data upload continues from where it was left off, and no data is missed. How is this accomplished under the covers?
In order to provide the S3 connector with exactly once semantics, we relied on two simple techniques:
  1. S3 multipart uploads: This feature enables us to stream changes gradually in parts and in the end make the complete object available in S3 with one atomic operation.
  2. We utilize the fact that Kafka and Kafka partitions are immutable. Therefore, if we upload the same range of offset twice, we get the exact same file.
Starting with S3, the key feature of its SDK that allows this connector to deliver data without duplicates is multipart uploads. In S3, every upload is atomic. An object is either present as a whole in S3 or not present at all. However, for an event streaming platform to upload data in a streaming fashion, the ability to perform the uploads incrementally is key for a connector.
Using multipart uploads, the S3 connector uploads each file gradually in parts, but this process is transparent to the users. They only see the end result, which is the complete file. This fact makes the S3 connector a robust exactly once connector since, even under the presence of failures, the apps that read files from S3 will always read the same files and contents once they become available as complete objects in S3.
Furthermore, in the presence of failures or in between restarts, the connector is able to pick up data export where it left off. In a few cases, this might mean that it will have to re-upload parts of an upload that were not completed. Again, this does not affect downstream applications reading records from S3 since such applications see the whole file, which is always the same if the partitioner that is used distributes Kafka records the same way every time.
The value of Kafka is that it makes exactly once semantics efficient and robust. Because of S3’s eventual consistency, we don’t probe S3 to recover state. At the same time, we also refrain from using the local disk of Connect workers to track the connector’s progress. Kafka is treated as the sole source of truth. This fact simplifies recovery from faults significantly. The connector just needs to commit offsets to Kafka once an upload is successful. On every restart, the connector worker starts to export records immediately from where it left off.
By not persisting the data to local disks, the connector is able to run faster, while Kafka is responsible for resilience and Connect workers are used to scale up data export in a stateless fashion.
Here are three examples that highlight how exactly once semantics are preserved in different situations:
  1. The S3 connector uploads a set of records to S3 using a partitioner that rolls a new file every 90 records. Let’s assume that each record takes 1 MB and the connector is configured to upload parts of 25 MB each. Let’s also assume that the connector starts consuming records from Kafka at offset 180. After the connector uploads the three parts of size 25 MB each successfully, the last part, with the remaining 15 MB is uploaded and the connector completes the multipart upload. S3 assembles the parts into a single file. This is a very quick operation in S3. Finally, the S3 connector commits the offset of the last Kafka record that was uploaded successfully when this multipart upload completed. The next starting Kafka record offset will be 270.Success: Apache Kafka | S3 Connector | AWS S3
    Successful upload and offset commit
  2. In a similar scenario, the S3 connector starts uploading another set of 90 records to S3, with starting offset 270. However, this time, somewhere in between uploading part three and part four, the Connect cluster experiences a hard failure, bringing the S3 connector offline for a short period of time. Even though this failure has interrupted the S3 connector from uploading the next 90 records in a single file, the users on the S3 side will not notice such a failure in the form of partial data because the file has not become visible yet. Once the S3 connector is back online, it will resume execution from the latest committed Kafka record offset, which is still 270 and after the multipart upload of all four parts succeeds this time, it will make available the new set of 90 records as a new file on S3. The next Kafka offset to be consumed will be 360.Failure: Apache Kafka | S3 Connector | AWS S3
    Failure during multipart upload and restart from latest committed offset
  3. In a different failure scenario, the S3 connector successfully uploads another set of 90 records to S3, with starting Kafka record offset 360, in four parts. The multipart upload completes successfully on the S3 side. However, this time, the commit message that attempts to store the record offset back to Kafka is lost. In this case, the S3 connector will resume from its latest saved Kafka offset, which is again 360. In the meantime the file that was previously uploaded to S3 is available to S3 users. In this scenario, the S3 connector avoids duplicates, because when the second attempt to upload the same 90 succeeds, the previous file will be overwritten in an atomic operation by the new file with the same name. This time offset commit succeeds on the Kafka side and the S3 connector is ready to consume new records from offset 450.
Failure: Apache Kafka | S3 Connector | AWS S3
Failure during offset commit and restart from latest committed offset

Conclusion

Since its initial release, the Kafka Connect S3 connector has been used to upload more than 75 PB of data from Kafka to S3. If you haven’t used it yet, give it a try by following the quick start or the demo as described. If you are one of the many users already using the S3 connector in production and development environments and you wish a missing feature was present, feel free to let us know by contributing an issue or a pull request to the S3 connector’s GitHub repo.
If you’d like to know more, you can download the Confluent Platform and get started with the leading distribution of Apache Kafka.
SOURCE:

Comments

Popular Posts