Kafka
Apache Kafka is a streaming platform that can be used to ship messages around the place.
Calculating (deprecated) DefaultPartititioner assigned partition
When creating a Kafka message, it's possible to set a key in order to ensure that all messages with that same key end up on the same partition.
Previously, the default strategy (aptly named DefaultPartitioner
) used to be that the message
key attached to a message would be hashed using
murmur2.
You could use this calculator combined with the
default Kafka seed
(9747b28c
) to figure out the murmur hash and then modulo by the number of partitions you had.
For example:
0974728 (seed) + 12413413 (input) -> 3242098085 (hash)
3242098085 (hash) % 16 (partition)
= 5
0974728 (seed) + 12413413 (input) -> 3242098085 (hash)
3242098085 (hash) % 16 (partition)
= 5
This would mean that a message with a key of 12413413
sent to a topic with 16 partitions, would
be assigned to Partition 5
Calculating hash strategy for kafka-go
Nowadays, the default partition is apparently UniformStickyPartitioner although at the time of writing IBM/sarama defaults to HashPartitioner and segmentio/kafka-go has round-robin as its default.
When it comes to the hash strategy, I recently found myself wondering how to determine the assigned partition for a program using kafka-go's hash partition.
This lead to me slicing up the default hasher implementation into a small Go playground program.
NOTE: While I've manually run the testcases through this small program and they all passed, I haven't exercised it in any depth. It's just a reference for myself in the future. You should probably use the actual library directly.
package main
import (
"fmt"
"hash"
"hash/fnv"
)
func main() {
partitions := generatePartitions(3)
key := "blah"
hasher := fnv.New32a().(hash.Hash32)
hasher.Reset()
if _, err := hasher.Write([]byte(key)); err != nil {
panic(err)
}
partition := int32(hasher.Sum32()) % int32(len(partitions))
if partition < 0 {
partition = -partition
}
fmt.Println(partition)
}
func generatePartitions(partitionCount int) []int {
partitions := []int{}
for i := 0; i <= partitionCount-1; i++ {
partitions = append(partitions, i)
}
return partitions
}
package main
import (
"fmt"
"hash"
"hash/fnv"
)
func main() {
partitions := generatePartitions(3)
key := "blah"
hasher := fnv.New32a().(hash.Hash32)
hasher.Reset()
if _, err := hasher.Write([]byte(key)); err != nil {
panic(err)
}
partition := int32(hasher.Sum32()) % int32(len(partitions))
if partition < 0 {
partition = -partition
}
fmt.Println(partition)
}
func generatePartitions(partitionCount int) []int {
partitions := []int{}
for i := 0; i <= partitionCount-1; i++ {
partitions = append(partitions, i)
}
return partitions
}