Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.sksamuel.elastic4s.requests.searches

import com.sksamuel.elastic4s.{ElasticClient, HitReader, RequestFailure, RequestSuccess}
import com.sksamuel.elastic4s.{CommonRequestOptions, ElasticClient, HitReader, RequestFailure, RequestSuccess}

import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
Expand All @@ -15,7 +15,10 @@ object SearchIterator {

/** Creates a new Iterator for instances of SearchHit by wrapping the given HTTP client.
*/
def hits(client: ElasticClient[Future], searchreq: SearchRequest)(implicit timeout: Duration): Iterator[SearchHit] =
def hits(client: ElasticClient[Future], searchreq: SearchRequest)(implicit
timeout: Duration,
options: CommonRequestOptions
): Iterator[SearchHit] =
new Iterator[SearchHit] {
require(searchreq.keepAlive.isDefined, "Search request must define keep alive value")

Expand Down Expand Up @@ -57,7 +60,8 @@ object SearchIterator {
*/
def iterate[T](client: ElasticClient[Future], searchreq: SearchRequest)(implicit
reader: HitReader[T],
timeout: Duration
timeout: Duration,
options: CommonRequestOptions
): Iterator[T] =
hits(client, searchreq).map(_.to[T])
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.sksamuel.elastic4s.akka.reactivestreams
import akka.actor.ActorRefFactory

import com.sksamuel.elastic4s.requests.searches.SearchRequest
import com.sksamuel.elastic4s.{ElasticClient, Indexes}
import com.sksamuel.elastic4s.{CommonRequestOptions, ElasticClient, Indexes}
import scala.concurrent.Future
import scala.concurrent.duration._

Expand Down Expand Up @@ -48,14 +48,22 @@ object ReactiveElastic {
}

def publisher(indexes: Indexes, elements: Long, keepAlive: String)(
implicit actorRefFactory: ActorRefFactory
implicit
actorRefFactory: ActorRefFactory,
options: CommonRequestOptions
): ScrollPublisher =
publisher(search(indexes).query("*:*").scroll(keepAlive), elements)

def publisher(q: SearchRequest)(implicit actorRefFactory: ActorRefFactory): ScrollPublisher =
def publisher(q: SearchRequest)(implicit
actorRefFactory: ActorRefFactory,
options: CommonRequestOptions
): ScrollPublisher =
publisher(q, Long.MaxValue)

def publisher(q: SearchRequest, elements: Long)(implicit actorRefFactory: ActorRefFactory): ScrollPublisher =
def publisher(q: SearchRequest, elements: Long)(implicit
actorRefFactory: ActorRefFactory,
options: CommonRequestOptions
): ScrollPublisher =
new ScrollPublisher(client, q, elements)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.sksamuel.elastic4s.akka.reactivestreams
import akka.actor.{Actor, ActorRefFactory, PoisonPill, Props, Stash}
import com.sksamuel.elastic4s.requests.searches.{SearchHit, SearchRequest, SearchResponse}
import com.sksamuel.elastic4s.akka.reactivestreams.PublishActor.Ready
import com.sksamuel.elastic4s.{ElasticClient, RequestFailure, RequestSuccess}
import com.sksamuel.elastic4s.{CommonRequestOptions, ElasticClient, RequestFailure, RequestSuccess}
import com.sksamuel.elastic4s.ext.OptionImplicits.RichOption
import org.reactivestreams.{Publisher, Subscriber, Subscription}
import org.slf4j.{Logger, LoggerFactory}
Expand All @@ -26,7 +26,9 @@ import scala.util.{Failure, Success}
* an Actor reference factory required by the publisher
*/
class ScrollPublisher private[reactivestreams] (client: ElasticClient[Future], search: SearchRequest, maxItems: Long)(
implicit actorRefFactory: ActorRefFactory
implicit
actorRefFactory: ActorRefFactory,
options: CommonRequestOptions
) extends Publisher[SearchHit] {
require(search.keepAlive.isDefined, "Search Definition must have a scroll to be used as Publisher")

Expand All @@ -43,7 +45,9 @@ class ScrollPublisher private[reactivestreams] (client: ElasticClient[Future], s
}

class ScrollSubscription(client: ElasticClient[Future], query: SearchRequest, s: Subscriber[_ >: SearchHit], max: Long)(
implicit actorRefFactory: ActorRefFactory
implicit
actorRefFactory: ActorRefFactory,
options: CommonRequestOptions
) extends Subscription {

private val actor = actorRefFactory.actorOf(Props(new PublishActor(client, query, s, max)))
Expand All @@ -70,7 +74,12 @@ object PublishActor {
case class Request(n: Long)
}

class PublishActor(client: ElasticClient[Future], query: SearchRequest, s: Subscriber[_ >: SearchHit], max: Long)
class PublishActor(
client: ElasticClient[Future],
query: SearchRequest,
s: Subscriber[_ >: SearchHit],
max: Long
)(implicit options: CommonRequestOptions)
extends Actor
with Stash {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.sksamuel.elastic4s.pekko.reactivestreams

import org.apache.pekko.actor.ActorRefFactory
import com.sksamuel.elastic4s.requests.searches.SearchRequest
import com.sksamuel.elastic4s.{ElasticClient, Indexes}
import com.sksamuel.elastic4s.{CommonRequestOptions, ElasticClient, Indexes}

import scala.concurrent.Future
import scala.concurrent.duration._
Expand Down Expand Up @@ -48,14 +48,22 @@ object ReactiveElastic {
}

def publisher(indexes: Indexes, elements: Long, keepAlive: String)(
implicit actorRefFactory: ActorRefFactory
implicit
actorRefFactory: ActorRefFactory,
options: CommonRequestOptions
): ScrollPublisher =
publisher(search(indexes).query("*:*").scroll(keepAlive), elements)

def publisher(q: SearchRequest)(implicit actorRefFactory: ActorRefFactory): ScrollPublisher =
def publisher(q: SearchRequest)(implicit
actorRefFactory: ActorRefFactory,
options: CommonRequestOptions
): ScrollPublisher =
publisher(q, Long.MaxValue)

def publisher(q: SearchRequest, elements: Long)(implicit actorRefFactory: ActorRefFactory): ScrollPublisher =
def publisher(q: SearchRequest, elements: Long)(implicit
actorRefFactory: ActorRefFactory,
options: CommonRequestOptions
): ScrollPublisher =
new ScrollPublisher(client, q, elements)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.sksamuel.elastic4s.pekko.reactivestreams

import org.apache.pekko.actor.{Actor, ActorRefFactory, PoisonPill, Props, Stash}
import com.sksamuel.elastic4s.requests.searches.{SearchHit, SearchRequest, SearchResponse}
import com.sksamuel.elastic4s.{ElasticClient, RequestFailure, RequestSuccess}
import com.sksamuel.elastic4s.{CommonRequestOptions, ElasticClient, RequestFailure, RequestSuccess}
import com.sksamuel.elastic4s.ext.OptionImplicits.RichOption
import com.sksamuel.elastic4s.pekko.reactivestreams.PublishActor.Ready
import org.reactivestreams.{Publisher, Subscriber, Subscription}
Expand All @@ -26,7 +26,9 @@ import scala.util.{Failure, Success}
* an Actor reference factory required by the publisher
*/
class ScrollPublisher private[reactivestreams] (client: ElasticClient[Future], search: SearchRequest, maxItems: Long)(
implicit actorRefFactory: ActorRefFactory
implicit
actorRefFactory: ActorRefFactory,
options: CommonRequestOptions
) extends Publisher[SearchHit] {
require(search.keepAlive.isDefined, "Search Definition must have a scroll to be used as Publisher")

Expand All @@ -43,7 +45,9 @@ class ScrollPublisher private[reactivestreams] (client: ElasticClient[Future], s
}

class ScrollSubscription(client: ElasticClient[Future], query: SearchRequest, s: Subscriber[_ >: SearchHit], max: Long)(
implicit actorRefFactory: ActorRefFactory
implicit
actorRefFactory: ActorRefFactory,
options: CommonRequestOptions
) extends Subscription {

private val actor = actorRefFactory.actorOf(Props(new PublishActor(client, query, s, max)))
Expand All @@ -70,7 +74,12 @@ object PublishActor {
case class Request(n: Long)
}

class PublishActor(client: ElasticClient[Future], query: SearchRequest, s: Subscriber[_ >: SearchHit], max: Long)
class PublishActor(
client: ElasticClient[Future],
query: SearchRequest,
s: Subscriber[_ >: SearchHit],
max: Long
)(implicit options: CommonRequestOptions)
extends Actor
with Stash {

Expand Down