Kafka

Around 2 minutes

Apache Kafka is a streaming platform that can be used to ship messages around the place.

Calculating (deprecated) DefaultPartititioner assigned partition

When creating a Kafka message, it's possible to set a key in order to ensure that all messages with that same key end up on the same partition.

Previously, the default strategy (aptly named DefaultPartitioner) used to be that the message key attached to a message would be hashed using murmur2.

You could use this calculator combined with the default Kafka seed (9747b28c) to figure out the murmur hash and then modulo by the number of partitions you had.

For example:

0974728 (seed) + 12413413 (input) -> 3242098085 (hash)
3242098085 (hash) % 16 (partition)
= 5
0974728 (seed) + 12413413 (input) -> 3242098085 (hash)
3242098085 (hash) % 16 (partition)
= 5

This would mean that a message with a key of 12413413 sent to a topic with 16 partitions, would be assigned to Partition 5

Calculating hash strategy for kafka-go

Nowadays, the default partition is apparently UniformStickyPartitioner although at the time of writing IBM/sarama defaults to HashPartitioner and segmentio/kafka-go has round-robin as its default.

When it comes to the hash strategy, I recently found myself wondering how to determine the assigned partition for a program using kafka-go's hash partition.

This lead to me slicing up the default hasher implementation into a small Go playground program.

NOTE: While I've manually run the testcases through this small program and they all passed, I haven't exercised it in any depth. It's just a reference for myself in the future. You should probably use the actual library directly.

package main
 
import (
	"fmt"
	"hash"
	"hash/fnv"
)
 
func main() {
	partitions := generatePartitions(3)
	key := "blah"
 
	hasher := fnv.New32a().(hash.Hash32)
	hasher.Reset()
 
	if _, err := hasher.Write([]byte(key)); err != nil {
		panic(err)
	}
 
	partition := int32(hasher.Sum32()) % int32(len(partitions))
	if partition < 0 {
		partition = -partition
	}
	fmt.Println(partition)
}
 
func generatePartitions(partitionCount int) []int {
	partitions := []int{}
	for i := 0; i <= partitionCount-1; i++ {
		partitions = append(partitions, i)
	}
	return partitions
}
package main
 
import (
	"fmt"
	"hash"
	"hash/fnv"
)
 
func main() {
	partitions := generatePartitions(3)
	key := "blah"
 
	hasher := fnv.New32a().(hash.Hash32)
	hasher.Reset()
 
	if _, err := hasher.Write([]byte(key)); err != nil {
		panic(err)
	}
 
	partition := int32(hasher.Sum32()) % int32(len(partitions))
	if partition < 0 {
		partition = -partition
	}
	fmt.Println(partition)
}
 
func generatePartitions(partitionCount int) []int {
	partitions := []int{}
	for i := 0; i <= partitionCount-1; i++ {
		partitions = append(partitions, i)
	}
	return partitions
}