Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,14 @@ Note: `int` is used for brevity but can also mean `long`. Don't worry about it.
| :internal-config | map | optional | A map of the underlying java client properties, for any extra lower level config |

#### Consumer-source options
| Key | Type | Req? | Notes |
|---------------------------------|-----------------------------------------------------------------------------------------------|----------|---------------------------------------------------------------------------------------|
| :group-id | string | required | |
| :shape | `:value:`, `[:vector <fields>]`,`[:map <fields>]`, or an arity-1 function of `ConsumerRecord` | optional | If unspecified, channel will contain ConsumerRecord objects. [Examples](#data-shapes) |
| :ketu.source/consumer-decorator | `ConsumerDecorator` | optional | [Protocol](#ketu.decorators.consumer.protocol) |

| Key | Type | Req? | Notes |
|---------------------------------------|-----------------------------------------------------------------------------------------------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| :group-id | string | required | |
| :shape | `:value:`, `[:vector <fields>]`,`[:map <fields>]`, or an arity-1 function of `ConsumerRecord` | optional | If unspecified, channel will contain ConsumerRecord objects. [Examples](#data-shapes) |
| :ketu.source/consumer-decorator | `ConsumerDecorator` | optional | [Protocol](#ketu.decorators.consumer.protocol) |
| :ketu.source/poll-error-handler | `(fn [^Consumer consumer opts] ...)` | optional | Called when `poll` throws (non-wakeup) exception; should return a (possibly empty) collection of records. May mutate consumer (e.g. `seek`) and/or opts (consumer options) |
| :ketu.source/error-skip-offset-amount | int | optional | Number of records to skip on a poll exception. If not set, default value is 1. |

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assigning :ketu.source/poll-error-handler overrides the definition of :ketu.source/error-skip-offset-amount. If the first is set, the latter is ignored. This is confusing for a user of the library.

I think we should support one or the either, or document that they are mutually exclusive

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean that the provided error handler may choose not to use the error-skip-offset-amount opt?


#### Producer-sink options

Expand Down Expand Up @@ -145,7 +148,8 @@ for example when managing the offset manually, auto-commit should usually set to

In this example we use the decorator to run commands in the polling thread context.
The consumer is paused/resumed based on commands sent from the application.
The decorator processes all immediately available commands in the commands-chan, and only then calls (poll-fn).
The decorator processes all immediately available commands in the commands-chan, and only then calls (poll-fn).

```clojure
(ns consumer-decorator-example
(:require [clojure.core.async :as async]
Expand Down Expand Up @@ -226,9 +230,10 @@ The decorator processes all immediately available commands in the commands-chan,
```

## Java Kafka client versions

- `ketu` version 1.0.0+ uses `org.apache.kafka/kafka-clients` version 3.3.1
- `ketu` version 2.1.0+ uses `org.apache.kafka/kafka-clients` version 3.9.1
- For a comprehensive list of changes in the java client, see [here](kafka-client-changes-analysis.md)
- For a comprehensive list of changes in the java client, see [here](kafka-client-changes-analysis.md)

## Development & Contribution

Expand Down
2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(defproject com.appsflyer/ketu "2.1.0"
(defproject com.appsflyer/ketu "2.2.0-SNAPSHOT"
:description "Clojure Apache Kafka client with core.async api"
:url "https://github.com/AppsFlyer/ketu"
:license {:name "Apache License, Version 2.0"
Expand Down
Loading