diff --git a/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/requests/searches/SearchIterator.scala b/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/requests/searches/SearchIterator.scala index 8c2ce5e357..930f948e51 100644 --- a/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/requests/searches/SearchIterator.scala +++ b/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/requests/searches/SearchIterator.scala @@ -1,6 +1,6 @@ package com.sksamuel.elastic4s.requests.searches -import com.sksamuel.elastic4s.{ElasticClient, HitReader, RequestFailure, RequestSuccess} +import com.sksamuel.elastic4s.{CommonRequestOptions, ElasticClient, HitReader, RequestFailure, RequestSuccess} import scala.concurrent.{Await, Future} import scala.concurrent.duration.Duration @@ -15,7 +15,10 @@ object SearchIterator { /** Creates a new Iterator for instances of SearchHit by wrapping the given HTTP client. */ - def hits(client: ElasticClient[Future], searchreq: SearchRequest)(implicit timeout: Duration): Iterator[SearchHit] = + def hits(client: ElasticClient[Future], searchreq: SearchRequest)(implicit + timeout: Duration, + options: CommonRequestOptions + ): Iterator[SearchHit] = new Iterator[SearchHit] { require(searchreq.keepAlive.isDefined, "Search request must define keep alive value") @@ -57,7 +60,8 @@ object SearchIterator { */ def iterate[T](client: ElasticClient[Future], searchreq: SearchRequest)(implicit reader: HitReader[T], - timeout: Duration + timeout: Duration, + options: CommonRequestOptions ): Iterator[T] = hits(client, searchreq).map(_.to[T]) } diff --git a/elastic4s-reactivestreams-akka/src/main/scala/com/sksamuel/elastic4s/akka/reactivestreams/ReactiveElastic.scala b/elastic4s-reactivestreams-akka/src/main/scala/com/sksamuel/elastic4s/akka/reactivestreams/ReactiveElastic.scala index b0d2ac776c..be6646aab8 100644 --- a/elastic4s-reactivestreams-akka/src/main/scala/com/sksamuel/elastic4s/akka/reactivestreams/ReactiveElastic.scala +++ b/elastic4s-reactivestreams-akka/src/main/scala/com/sksamuel/elastic4s/akka/reactivestreams/ReactiveElastic.scala @@ -3,7 +3,7 @@ package com.sksamuel.elastic4s.akka.reactivestreams import akka.actor.ActorRefFactory import com.sksamuel.elastic4s.requests.searches.SearchRequest -import com.sksamuel.elastic4s.{ElasticClient, Indexes} +import com.sksamuel.elastic4s.{CommonRequestOptions, ElasticClient, Indexes} import scala.concurrent.Future import scala.concurrent.duration._ @@ -48,14 +48,22 @@ object ReactiveElastic { } def publisher(indexes: Indexes, elements: Long, keepAlive: String)( - implicit actorRefFactory: ActorRefFactory + implicit + actorRefFactory: ActorRefFactory, + options: CommonRequestOptions ): ScrollPublisher = publisher(search(indexes).query("*:*").scroll(keepAlive), elements) - def publisher(q: SearchRequest)(implicit actorRefFactory: ActorRefFactory): ScrollPublisher = + def publisher(q: SearchRequest)(implicit + actorRefFactory: ActorRefFactory, + options: CommonRequestOptions + ): ScrollPublisher = publisher(q, Long.MaxValue) - def publisher(q: SearchRequest, elements: Long)(implicit actorRefFactory: ActorRefFactory): ScrollPublisher = + def publisher(q: SearchRequest, elements: Long)(implicit + actorRefFactory: ActorRefFactory, + options: CommonRequestOptions + ): ScrollPublisher = new ScrollPublisher(client, q, elements) } } diff --git a/elastic4s-reactivestreams-akka/src/main/scala/com/sksamuel/elastic4s/akka/reactivestreams/ScrollPublisher.scala b/elastic4s-reactivestreams-akka/src/main/scala/com/sksamuel/elastic4s/akka/reactivestreams/ScrollPublisher.scala index 89c6a27e9e..aeae24f54a 100644 --- a/elastic4s-reactivestreams-akka/src/main/scala/com/sksamuel/elastic4s/akka/reactivestreams/ScrollPublisher.scala +++ b/elastic4s-reactivestreams-akka/src/main/scala/com/sksamuel/elastic4s/akka/reactivestreams/ScrollPublisher.scala @@ -3,7 +3,7 @@ package com.sksamuel.elastic4s.akka.reactivestreams import akka.actor.{Actor, ActorRefFactory, PoisonPill, Props, Stash} import com.sksamuel.elastic4s.requests.searches.{SearchHit, SearchRequest, SearchResponse} import com.sksamuel.elastic4s.akka.reactivestreams.PublishActor.Ready -import com.sksamuel.elastic4s.{ElasticClient, RequestFailure, RequestSuccess} +import com.sksamuel.elastic4s.{CommonRequestOptions, ElasticClient, RequestFailure, RequestSuccess} import com.sksamuel.elastic4s.ext.OptionImplicits.RichOption import org.reactivestreams.{Publisher, Subscriber, Subscription} import org.slf4j.{Logger, LoggerFactory} @@ -26,7 +26,9 @@ import scala.util.{Failure, Success} * an Actor reference factory required by the publisher */ class ScrollPublisher private[reactivestreams] (client: ElasticClient[Future], search: SearchRequest, maxItems: Long)( - implicit actorRefFactory: ActorRefFactory + implicit + actorRefFactory: ActorRefFactory, + options: CommonRequestOptions ) extends Publisher[SearchHit] { require(search.keepAlive.isDefined, "Search Definition must have a scroll to be used as Publisher") @@ -43,7 +45,9 @@ class ScrollPublisher private[reactivestreams] (client: ElasticClient[Future], s } class ScrollSubscription(client: ElasticClient[Future], query: SearchRequest, s: Subscriber[_ >: SearchHit], max: Long)( - implicit actorRefFactory: ActorRefFactory + implicit + actorRefFactory: ActorRefFactory, + options: CommonRequestOptions ) extends Subscription { private val actor = actorRefFactory.actorOf(Props(new PublishActor(client, query, s, max))) @@ -70,7 +74,12 @@ object PublishActor { case class Request(n: Long) } -class PublishActor(client: ElasticClient[Future], query: SearchRequest, s: Subscriber[_ >: SearchHit], max: Long) +class PublishActor( + client: ElasticClient[Future], + query: SearchRequest, + s: Subscriber[_ >: SearchHit], + max: Long +)(implicit options: CommonRequestOptions) extends Actor with Stash { diff --git a/elastic4s-reactivestreams-pekko/src/main/scala/com/sksamuel/elastic4s/pekko/reactivestreams/ReactiveElastic.scala b/elastic4s-reactivestreams-pekko/src/main/scala/com/sksamuel/elastic4s/pekko/reactivestreams/ReactiveElastic.scala index 7a993ef909..be683bf94d 100644 --- a/elastic4s-reactivestreams-pekko/src/main/scala/com/sksamuel/elastic4s/pekko/reactivestreams/ReactiveElastic.scala +++ b/elastic4s-reactivestreams-pekko/src/main/scala/com/sksamuel/elastic4s/pekko/reactivestreams/ReactiveElastic.scala @@ -2,7 +2,7 @@ package com.sksamuel.elastic4s.pekko.reactivestreams import org.apache.pekko.actor.ActorRefFactory import com.sksamuel.elastic4s.requests.searches.SearchRequest -import com.sksamuel.elastic4s.{ElasticClient, Indexes} +import com.sksamuel.elastic4s.{CommonRequestOptions, ElasticClient, Indexes} import scala.concurrent.Future import scala.concurrent.duration._ @@ -48,14 +48,22 @@ object ReactiveElastic { } def publisher(indexes: Indexes, elements: Long, keepAlive: String)( - implicit actorRefFactory: ActorRefFactory + implicit + actorRefFactory: ActorRefFactory, + options: CommonRequestOptions ): ScrollPublisher = publisher(search(indexes).query("*:*").scroll(keepAlive), elements) - def publisher(q: SearchRequest)(implicit actorRefFactory: ActorRefFactory): ScrollPublisher = + def publisher(q: SearchRequest)(implicit + actorRefFactory: ActorRefFactory, + options: CommonRequestOptions + ): ScrollPublisher = publisher(q, Long.MaxValue) - def publisher(q: SearchRequest, elements: Long)(implicit actorRefFactory: ActorRefFactory): ScrollPublisher = + def publisher(q: SearchRequest, elements: Long)(implicit + actorRefFactory: ActorRefFactory, + options: CommonRequestOptions + ): ScrollPublisher = new ScrollPublisher(client, q, elements) } } diff --git a/elastic4s-reactivestreams-pekko/src/main/scala/com/sksamuel/elastic4s/pekko/reactivestreams/ScrollPublisher.scala b/elastic4s-reactivestreams-pekko/src/main/scala/com/sksamuel/elastic4s/pekko/reactivestreams/ScrollPublisher.scala index 5dfed539ad..c2f31f9ac4 100644 --- a/elastic4s-reactivestreams-pekko/src/main/scala/com/sksamuel/elastic4s/pekko/reactivestreams/ScrollPublisher.scala +++ b/elastic4s-reactivestreams-pekko/src/main/scala/com/sksamuel/elastic4s/pekko/reactivestreams/ScrollPublisher.scala @@ -2,7 +2,7 @@ package com.sksamuel.elastic4s.pekko.reactivestreams import org.apache.pekko.actor.{Actor, ActorRefFactory, PoisonPill, Props, Stash} import com.sksamuel.elastic4s.requests.searches.{SearchHit, SearchRequest, SearchResponse} -import com.sksamuel.elastic4s.{ElasticClient, RequestFailure, RequestSuccess} +import com.sksamuel.elastic4s.{CommonRequestOptions, ElasticClient, RequestFailure, RequestSuccess} import com.sksamuel.elastic4s.ext.OptionImplicits.RichOption import com.sksamuel.elastic4s.pekko.reactivestreams.PublishActor.Ready import org.reactivestreams.{Publisher, Subscriber, Subscription} @@ -26,7 +26,9 @@ import scala.util.{Failure, Success} * an Actor reference factory required by the publisher */ class ScrollPublisher private[reactivestreams] (client: ElasticClient[Future], search: SearchRequest, maxItems: Long)( - implicit actorRefFactory: ActorRefFactory + implicit + actorRefFactory: ActorRefFactory, + options: CommonRequestOptions ) extends Publisher[SearchHit] { require(search.keepAlive.isDefined, "Search Definition must have a scroll to be used as Publisher") @@ -43,7 +45,9 @@ class ScrollPublisher private[reactivestreams] (client: ElasticClient[Future], s } class ScrollSubscription(client: ElasticClient[Future], query: SearchRequest, s: Subscriber[_ >: SearchHit], max: Long)( - implicit actorRefFactory: ActorRefFactory + implicit + actorRefFactory: ActorRefFactory, + options: CommonRequestOptions ) extends Subscription { private val actor = actorRefFactory.actorOf(Props(new PublishActor(client, query, s, max))) @@ -70,7 +74,12 @@ object PublishActor { case class Request(n: Long) } -class PublishActor(client: ElasticClient[Future], query: SearchRequest, s: Subscriber[_ >: SearchHit], max: Long) +class PublishActor( + client: ElasticClient[Future], + query: SearchRequest, + s: Subscriber[_ >: SearchHit], + max: Long +)(implicit options: CommonRequestOptions) extends Actor with Stash {