Stream Decorator Reference

cartesianProduct

The cartesianProduct function turns a single tuple with a multi-valued field (i.e., an array) into multiple tuples, one for each value in the array field. That is, given a single tuple containing an array of N values for fieldA, the cartesianProduct function will output N tuples, each with one value from the original tuple’s array. In essence, you can flatten arrays for further processing.

For example, using cartesianProduct you can turn this tuple:

{
  "fieldA": "foo",
  "fieldB": ["bar","baz","bat"]
}

into the following 3 tuples:

{
  "fieldA": "foo",
  "fieldB": "bar"
}
{
  "fieldA": "foo",
  "fieldB": "baz"
}
{
  "fieldA": "foo",
  "fieldB": "bat"
}

cartesianProduct Parameters

  • incoming stream: (Mandatory) A single incoming stream.

  • fieldName or evaluator: (Mandatory) Name of field to flatten values for, or evaluator whose result should be flattened.

  • productSort='fieldName ASC|DESC': (Optional) Sort order of the newly generated tuples.

cartesianProduct Syntax

cartesianProduct(
  <stream>,
  <fieldName | evaluator> [as newFieldName],
  productSort='fieldName ASC|DESC'
)

cartesianProduct Examples

The following examples show different outputs for this source tuple:

{
  "fieldA": "valueA",
  "fieldB": ["valueB1","valueB2"],
  "fieldC": [1,2,3]
}

Single Field, No Sorting

cartesianProduct(
  search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
  fieldB
)

{
  "fieldA": "valueA",
  "fieldB": "valueB1",
  "fieldC": [1,2,3]
}
{
  "fieldA": "valueA",
  "fieldB": "valueB2",
  "fieldC": [1,2,3]
}

Single Evaluator, No Sorting

cartesianProduct(
  search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
  sequence(3,4,5) as fieldE
)

{
  "fieldA": "valueA",
  "fieldB": ["valueB1","valueB2"],
  "fieldC": [1,2,3],
  "fieldE": 4
}
{
  "fieldA": "valueA",
  "fieldB": ["valueB1","valueB2"],
  "fieldC": [1,2,3],
  "fieldE": 9
}
{
  "fieldA": "valueA",
  "fieldB": ["valueB1","valueB2"],
  "fieldC": [1,2,3],
  "fieldE": 14
}

Single Field, Sorted by Value

cartesianProduct(
  search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
  fieldB,
  productSort="fieldB desc"
)

{
  "fieldA": "valueA",
  "fieldB": "valueB2",
  "fieldC": [1,2,3]
}
{
  "fieldA": "valueA",
  "fieldB": "valueB1",
  "fieldC": [1,2,3]
}

Single Evaluator, Sorted by Evaluator Values

cartesianProduct(
  search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
  sequence(3,4,5) as fieldE,
  productSort="newFieldE desc"
)

{
  "fieldA": "valueA",
  "fieldB": ["valueB1","valueB2"],
  "fieldC": [1,2,3],
  "fieldE": 14
}
{
  "fieldA": "valueA",
  "fieldB": ["valueB1","valueB2"],
  "fieldC": [1,2,3],
  "fieldE": 9
}
{
  "fieldA": "valueA",
  "fieldB": ["valueB1","valueB2"],
  "fieldC": [1,2,3],
  "fieldE": 4
}

Renamed Single Field, Sorted by Value

cartesianProduct(
  search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
  fieldB as newFieldB,
  productSort="fieldB desc"
)

{
  "fieldA": "valueA",
  "fieldB": ["valueB1","valueB2"],
  "fieldC": [1,2,3]
  "newFieldB": "valueB2",
}
{
  "fieldA": "valueA",
  "fieldB": ["valueB1","valueB2"],
  "fieldC": [1,2,3]
  "newFieldB": "valueB1",
}

Multiple Fields, No Sorting

cartesianProduct(
  search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
  fieldB,
  fieldC
)

{
  "fieldA": "valueA",
  "fieldB": "valueB1",
  "fieldC": 1
}
{
  "fieldA": "valueA",
  "fieldB": "valueB1",
  "fieldC": 2
}
{
  "fieldA": "valueA",
  "fieldB": "valueB1",
  "fieldC": 3
}
{
  "fieldA": "valueA",
  "fieldB": "valueB2",
  "fieldC": 1
}
{
  "fieldA": "valueA",
  "fieldB": "valueB2",
  "fieldC": 2
}
{
  "fieldA": "valueA",
  "fieldB": "valueB2",
  "fieldC": 3
}

Multiple Fields, Sorted by Single Field

cartesianProduct(
  search(collection1, qt="/export", q="*:*", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
  fieldB,
  fieldC,
  productSort="fieldC asc"
)

{
  "fieldA": "valueA",
  "fieldB": "valueB1",
  "fieldC": 1
}
{
  "fieldA": "valueA",
  "fieldB": "valueB2",
  "fieldC": 1
}
{
  "fieldA": "valueA",
  "fieldB": "valueB1",
  "fieldC": 2
}
{
  "fieldA": "valueA",
  "fieldB": "valueB2",
  "fieldC": 2
}
{
  "fieldA": "valueA",
  "fieldB": "valueB1",
  "fieldC": 3
}
{
  "fieldA": "valueA",
  "fieldB": "valueB2",
  "fieldC": 3
}

Multiple Fields, Sorted by Multiple Fields

cartesianProduct(
  search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
  fieldB,
  fieldC,
  productSort="fieldC asc, fieldB desc"
)

{
  "fieldA": "valueA",
  "fieldB": "valueB2",
  "fieldC": 1
}
{
  "fieldA": "valueA",
  "fieldB": "valueB1",
  "fieldC": 1
}
{
  "fieldA": "valueA",
  "fieldB": "valueB2",
  "fieldC": 2
}
{
  "fieldA": "valueA",
  "fieldB": "valueB1",
  "fieldC": 2
}
{
  "fieldA": "valueA",
  "fieldB": "valueB2",
  "fieldC": 3
}
{
  "fieldA": "valueA",
  "fieldB": "valueB1",
  "fieldC": 3
}

Field and Evaluator, No Sorting

cartesianProduct(
  search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
  sequence(3,4,5) as fieldE,
  fieldB
)

{
  "fieldA": "valueA",
  "fieldB": valueB1,
  "fieldC": [1,2,3],
  "fieldE": 4
}
{
  "fieldA": "valueA",
  "fieldB": valueB2,
  "fieldC": [1,2,3],
  "fieldE": 4
}
{
  "fieldA": "valueA",
  "fieldB": valueB1,
  "fieldC": [1,2,3],
  "fieldE": 9
}
{
  "fieldA": "valueA",
  "fieldB": valueB2,
  "fieldC": [1,2,3],
  "fieldE": 9
}
{
  "fieldA": "valueA",
  "fieldB": valueB1,
  "fieldC": [1,2,3],
  "fieldE": 14
}
{
  "fieldA": "valueA",
  "fieldB": valueB2,
  "fieldC": [1,2,3],
  "fieldE": 14
}

As you can see in the examples above, the cartesianProduct function does support flattening tuples across multiple fields and/or evaluators.

classify

The classify function classifies tuples using a logistic regression text classification model. It was designed specifically to work with models trained using the train function. The classify function uses the model function to retrieve a stored model and then scores a stream of tuples using the model. The tuples read by the classifier must contain a text field that can be used for classification. The classify function uses a Lucene analyzer to extract the features from the text so the model can be applied. By default the classify function looks for the analyzer using the name of text field in the tuple. If the Solr schema on the worker node does not contain this field, the analyzer can be looked up in another field by specifying the analyzerField parameter.

Each tuple that is classified is assigned two scores:

  • probability_d*: A float between 0 and 1 which describes the probability that the tuple belongs to the class. This is useful in the classification use case.

  • score_d*: The score of the document that has not be squashed between 0 and 1. The score may be positive or negative. The higher the score the better the document fits the class. This un-squashed score will be useful in query re-ranking and recommendation use cases. This score is particularly useful when multiple high ranking documents have a probability_d score of 1, which won’t provide a meaningful ranking between documents.

classify Parameters

  • model expression: (Mandatory) Retrieves the stored logistic regression model.

  • field: (Mandatory) The field in the tuples to apply the classifier to. By default the analyzer for this field in the schema will be used extract the features.

  • analyzerField: (Optional) Specifies a different field to find the analyzer from in the schema.

classify Syntax

classify(model(modelCollection,
             id="model1",
             cacheMillis=5000),
         search(contentCollection,
             q="id:(a b c)",
             qt="/export",
             fl="text_t, id",
             sort="id asc"),
             field="text_t")

In the example above the classify expression is retrieving the model using the model function. It is then classifying tuples returned by the search function. The text_t field is used for the text classification and the analyzer for the text_t field in the Solr schema is used to analyze the text and extract the features.

commit

The commit function wraps a single stream (A) and given a collection and batch size will send commit messages to the collection when the batch size is fulfilled or the end of stream is reached. A commit stream is used most frequently with an update stream and as such the commit will take into account possible summary tuples coming from the update stream. All tuples coming into the commit stream will be returned out of the commit stream - no tuples will be dropped and no tuples will be added.

commit Parameters

  • collection: The collection to send commit messages to (required)

  • batchSize: The commit batch size, sends commit message when batch size is hit. If not provided (or provided as value 0) then a commit is only sent at the end of the incoming stream.

  • waitFlush: The value passed directly to the commit handler (true/false, default: false)

  • waitSearcher: The value passed directly to the commit handler (true/false, default: false)

  • softCommit: The value passed directly to the commit handler (true/false, default: false)

  • StreamExpression for StreamA (required)

commit Syntax

commit(
    destinationCollection,
    batchSize=2,
    update(
        destinationCollection,
        batchSize=5,
        search(collection1, q="*:*", qt="/export", fl="id,a_s,a_i,a_f,s_multi,i_multi", sort="a_f asc, a_i asc")
    )
)

complement

The complement function wraps two streams (A and B) and emits tuples from A which do not exist in B. The tuples are emitted in the order in which they appear in stream A. Both streams must be sorted by the fields being used to determine equality (using the on parameter).

complement Parameters

  • StreamExpression for StreamA

  • StreamExpression for StreamB

  • on: Fields to be used for checking equality of tuples between A and B. Can be of the format on="fieldName", on="fieldNameInLeft=fieldNameInRight", or on="fieldName, otherFieldName=rightOtherFieldName".

complement Syntax

complement(
  search(collection1, q="a_s:(setA || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
  search(collection1, q="a_s:(setB || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc"),
  on="a_i"
)

complement(
  search(collection1, q="a_s:(setA || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
  search(collection1, q="a_s:(setB || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
  on="a_i,a_s"
)

daemon

The daemon function wraps another function and runs it at intervals using an internal thread. The daemon function can be used to provide both continuous push and pull streaming.

Continuous Push Streaming

With continuous push streaming the daemon function wraps another function and is then sent to the /stream handler for execution. The /stream handler recognizes the daemon function and keeps it resident in memory, so it can run its internal function at intervals.

In order to facilitate the pushing of tuples, the daemon function must wrap another stream decorator that pushes the tuples somewhere. One example of this is the update function, which wraps a stream and sends the tuples to another SolrCloud collection for indexing.

daemon Syntax

daemon(id="uniqueId",
       runInterval="1000",
       terminate="true",
       update(destinationCollection,
              batchSize=100,
              topic(checkpointCollection,
                    topicCollection,
                    q="topic query",
                    fl="id, title, abstract, text",
                    id="topicId",
                    initialCheckpoint=0)
               )
        )

The sample code above shows a daemon function wrapping an update function, which is wrapping a topic function. When this expression is sent to the /stream handler, the /stream hander sees the daemon function and keeps it in memory where it will run at intervals. In this particular example, the daemon function will run the update function every second. The update function is wrapping a topic function, which will stream tuples that match the topic function query in batches. Each subsequent call to the topic will return the next batch of tuples for the topic. The update function will send all the tuples matching the topic to another collection to be indexed. The terminate parameter tells the daemon to terminate when the topic function stops sending tuples.

The effect of this is to push documents that match a specific query into another collection. Custom push functions can be plugged in that push documents out of Solr and into other systems, such as Kafka or an email system.

Push streaming can also be used for continuous background aggregation scenarios where aggregates are rolled up in the background at intervals and pushed to other Solr collections. Another use case is continuous background machine learning model optimization, where the optimized model is pushed to another Solr collection where it can be integrated into queries.

The /stream handler supports a small set of commands for listing and controlling daemon functions:

http://localhost:8983/collection/stream?action=list

This command will provide a listing of the current daemons running on the specific node along with their current state.

http://localhost:8983/collection/stream?action=stop&id=daemonId

This command will stop a specific daemon function but leave it resident in memory.

http://localhost:8983/collection/stream?action=start&id=daemonId

This command will start a specific daemon function that has been stopped.

http://localhost:8983/collection/stream?action=kill&id=daemonId

This command will stop a specific daemon function and remove it from memory.

Continuous Pull Streaming

The DaemonStream java class (part of the SolrJ libraries) can also be embedded in a java application to provide continuous pull streaming. Sample code:

StreamContext context = new StreamContext()
SolrClientCache cache = new SolrClientCache();
context.setSolrClientCache(cache);

Map topicQueryParams = new HashMap();
topicQueryParams.put("q","hello");  // The query for the topic
topicQueryparams.put("rows", "500"); // How many rows to fetch during each run
topicQueryparams.put("fl", "id", "title"); // The field list to return with the documents

TopicStream topicStream = new TopicStream(zkHost,        // Host address for the ZooKeeper service housing the collections
                                         "checkpoints",  // The collection to store the topic checkpoints
                                         "topicData",    // The collection to query for the topic records
                                         "topicId",      // The id of the topic
                                         -1,             // checkpoint every X tuples, if set -1 it will checkpoint after each run.
                                          topicQueryParams); // The query parameters for the TopicStream

DaemonStream daemonStream = new DaemonStream(topicStream,             // The underlying stream to run.
                                             "daemonId",              // The id of the daemon
                                             1000,                    // The interval at which to run the internal stream
                                             500);                    // The internal queue size for the daemon stream. Tuples will be placed in the queue
                                                                      // as they are read by the internal internal thread.
                                                                      // Calling read() on the daemon stream reads records from the internal queue.

daemonStream.setStreamContext(context);

daemonStream.open();

//Read until it's time to shutdown the DaemonStream. You can define the shutdown criteria.
while(!shutdown()) {
    Tuple tuple = daemonStream.read() // This will block until tuples become available from the underlying stream (TopicStream)
                                      // The EOF tuple (signaling the end of the stream) will never occur until the DaemonStream has been shutdown.
    //Do something with the tuples
}

// Shutdown the DaemonStream.
daemonStream.shutdown();

//Read the DaemonStream until the EOF Tuple is found.
//This allows the underlying stream to perform an orderly shutdown.

while(true) {
    Tuple tuple = daemonStream.read();
    if(tuple.EOF) {
        break;
    } else {
        //Do something with the tuples.
    }
}
//Finally close the stream
daemonStream.close();

delete

The delete function wraps other functions and uses the id and _version_ values found to send the tuples to a SolrCloud collection as Delete By Id commands.

This is similar to the update() function described below.

delete Parameters

  • destinationCollection: (Mandatory) The collection where the tuples will deleted.

  • batchSize: (Mandatory) The indexing batch size.

  • pruneVersionField: (Optional, defaults to false) Whether to prune _version_ values from tuples

  • StreamExpression: (Mandatory)

delete Syntax

 delete(collection1
        batchSize=500,
        search(collection1,
               q=old_data:true,
               qt="/export",
               fl="id",
               sort="a_f asc, a_i asc"))

The example above consumes the tuples returned by the search function against collection1 and converts the id value of each document found into a delete request against the same collection1.

Unlike the update() function, delete() defaults to pruneVersionField=false — preserving any _version_ values found in the inner stream when converting the tuples to "Delete By ID" requests. This ensures that using this stream will not (by default) result in deleting any documents that were updated after the search(…​) was executed, but before the delete(…​) processed that tuple (leveraging Optimistic concurrency constraints).

Users who wish to ignore concurrent updates and delete all matched documents should set pruneVersionField=true (or ensure that the inner stream tuples do not include any _version_ values).

Users who anticipate concurrent updates, and wish to "skip" any failed deletes, should consider configuring the TolerantUpdateProcessorFactory

eval

The eval function allows for use cases where new streaming expressions are generated on the fly and then evaluated. The eval function wraps a streaming expression and reads a single tuple from the underlying stream. The eval function then retrieves a string Streaming Expressions from the expr_s field of the tuple. The eval function then compiles the string Streaming Expression and emits the tuples.

eval Parameters

  • StreamExpression: (Mandatory) The stream which provides the streaming expression to be evaluated.

eval Syntax

eval(expr)

In the example above the eval expression reads the first tuple from the underlying expression. It then compiles and executes the string Streaming Expression in the expr_s field.

executor

The executor function wraps a stream source that contains streaming expressions, and executes the expressions in parallel. The executor function looks for the expression in the expr_s field in each tuple. The executor function has an internal thread pool that runs tasks that compile and run expressions in parallel on the same worker node. This function can also be parallelized across worker nodes by wrapping it in the parallel function to provide parallel execution of expressions across a cluster.

The executor function does not do anything specific with the output of the expressions that it runs. Therefore the expressions that are executed must contain the logic for pushing tuples to their destination. The update function can be included in the expression being executed to send the tuples to a SolrCloud collection for storage.

This model allows for asynchronous execution of jobs where the output is stored in a SolrCloud collection where it can be accessed as the job progresses.

executor Parameters

  • threads: (Optional) The number of threads in the executors thread pool for executing expressions.

  • StreamExpression: (Mandatory) The stream source which contains the Streaming Expressions to execute.

executor Syntax

daemon(id="myDaemon",
       terminate="true",
       executor(threads=10,
                topic(checkpointCollection
                      storedExpressions,
                      q="*:*",
                      fl="id, expr_s",
                      initialCheckPoint=0,
                      id="myTopic")))

In the example above a daemon wraps an executor, which wraps a topic that is returning tuples with expressions to execute. When sent to the stream handler, the daemon will call the executor at intervals which will cause the executor to read from the topic and execute the expressions found in the expr_s field. The daemon will repeatedly call the executor until all the tuples that match the topic have been iterated, then it will terminate. This is the approach for executing batches of streaming expressions from a topic queue.

fetch

The fetch function iterates a stream and fetches additional fields and adds them to the tuples. The fetch function fetches in batches to limit the number of calls back to Solr. Tuples streamed from the fetch function will contain the original fields and the additional fields that were fetched. The fetch function supports one-to-one fetches. Many-to-one fetches, where the stream source contains duplicate keys, will also work, but one-to-many fetches are currently not supported by this function.

fetch Parameters

  • Collection: (Mandatory) The collection to fetch the fields from.

  • StreamExpression: (Mandatory) The stream source for the fetch function.

  • fl: (Mandatory) The fields to be fetched.

  • on: Fields to be used for checking equality of tuples between stream source and fetched records. Formatted as on="fieldNameInTuple=fieldNameInCollection".

  • batchSize: (Optional) The batch fetch size.

fetch Syntax

fetch(addresses,
      search(people, q="*:*", qt="/export", fl="username, firstName, lastName", sort="username asc"),
      fl="streetAddress, city, state, country, zip",
      on="username=userId")

The example above fetches addresses for users by matching the username in the tuple with the userId field in the addresses collection.

having

The having expression wraps a stream and applies a boolean operation to each tuple. It emits only tuples for which the boolean operation returns true.

having Parameters

  • StreamExpression: (Mandatory) The stream source for the having function.

  • booleanEvaluator: (Mandatory) The following boolean operations are supported: eq (equals), gt (greater than), lt (less than), gteq (greater than or equal to), lteq (less than or equal to), and, or, eor (exclusive or), and not. Boolean evaluators can be nested with other evaluators to form complex boolean logic.

The comparison evaluators compare the value in a specific field with a value, whether a string, number, or boolean. For example: eq(field1, 10), returns true if field1 is equal to 10.

having Syntax

having(rollup(over=a_s,
              sum(a_i),
              search(collection1,
                     q="*:*",
                     qt="/export",
                     fl="id,a_s,a_i,a_f",
                     sort="a_s asc")),
       and(gt(sum(a_i), 100), lt(sum(a_i), 110)))

In this example, the having expression iterates the aggregated tuples from the rollup expression and emits all tuples where the field sum(a_i) is greater then 100 and less then 110.

leftOuterJoin

The leftOuterJoin function wraps two streams, Left and Right, and emits tuples from Left. If there is a tuple in Right equal (as defined by on) then the values in that tuple will be included in the emitted tuple. An equal tuple in Right need not exist for the Left tuple to be emitted. This supports one-to-one, one-to-many, many-to-one, and many-to-many left outer join scenarios. The tuples are emitted in the order in which they appear in the Left stream. Both streams must be sorted by the fields being used to determine equality (using the on parameter). If both tuples contain a field of the same name then the value from the Right stream will be used in the emitted tuple.

You can wrap the incoming streams with a select function to be specific about which field values are included in the emitted tuple.

leftOuterJoin Parameters

  • StreamExpression for StreamLeft

  • StreamExpression for StreamRight

  • on: Fields to be used for checking equality of tuples between Left and Right. Can be of the format on="fieldName", on="fieldNameInLeft=fieldNameInRight", or on="fieldName, otherFieldName=rightOtherFieldName".

leftOuterJoin Syntax

leftOuterJoin(
  search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
  search(pets, q="type:cat", qt="/export", fl="personId,petName", sort="personId asc"),
  on="personId"
)

leftOuterJoin(
  search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
  search(pets, q="type:cat", qt="/export", fl="ownerId,petName", sort="ownerId asc"),
  on="personId=ownerId"
)

leftOuterJoin(
  search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
  select(
    search(pets, q="type:cat", qt="/export", fl="ownerId,name", sort="ownerId asc"),
    ownerId,
    name as petName
  ),
  on="personId=ownerId"
)

hashJoin

The hashJoin function wraps two streams, Left and Right, and for every tuple in Left which exists in Right will emit a tuple containing the fields of both tuples. This supports one-to-one, one-to-many, many-to-one, and many-to-many inner join scenarios. The tuples are emitted in the order in which they appear in the Left stream. The order of the streams does not matter. If both tuples contain a field of the same name then the value from the Right stream will be used in the emitted tuple.

You can wrap the incoming streams with a select function to be specific about which field values are included in the emitted tuple.

The hashJoin function can be used when the tuples of Left and Right cannot be put in the same order. Because the tuples are out of order this stream functions by reading all values from the Right stream during the open operation and will store all tuples in memory. The result of this is a memory footprint equal to the size of the Right stream.

hashJoin Parameters

  • StreamExpression for StreamLeft

  • hashed=StreamExpression for StreamRight

  • on: Fields to be used for checking equality of tuples between Left and Right. Can be of the format on="fieldName", on="fieldNameInLeft=fieldNameInRight", or on="fieldName, otherFieldName=rightOtherFieldName".

hashJoin Syntax

hashJoin(
  search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
  hashed=search(pets, q="type:cat", qt="/export", fl="personId,petName", sort="personId asc"),
  on="personId"
)

hashJoin(
  search(people, q="*:*", fl="personId,name", sort="personId asc"),
  hashed=search(pets, q="type:cat", qt="/export", fl="ownerId,petName", sort="ownerId asc"),
  on="personId=ownerId"
)

hashJoin(
  search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
  hashed=select(
    search(pets, q="type:cat", qt="/export", fl="ownerId,name", sort="ownerId asc"),
    ownerId,
    name as petName
  ),
  on="personId=ownerId"
)

innerJoin

Wraps two streams, Left and Right. For every tuple in Left which exists in Right a tuple containing the fields of both tuples will be emitted. This supports one-to-one, one-to-many, many-to-one, and many-to-many inner join scenarios. The tuples are emitted in the order in which they appear in the Left stream. Both streams must be sorted by the fields being used to determine equality (the 'on' parameter). If both tuples contain a field of the same name then the value from the Right stream will be used in the emitted tuple. You can wrap the incoming streams with a select(…​) expression to be specific about which field values are included in the emitted tuple.

innerJoin Parameters

  • StreamExpression for StreamLeft

  • StreamExpression for StreamRight

  • on: Fields to be used for checking equality of tuples between Left and Right. Can be of the format on="fieldName", on="fieldNameInLeft=fieldNameInRight", or on="fieldName, otherFieldName=rightOtherFieldName".

innerJoin Syntax

innerJoin(
  search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
  search(pets, q="type:cat", qt="/export", fl="personId,petName", sort="personId asc"),
  on="personId"
)

innerJoin(
  search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
  search(pets, q="type:cat", qt="/export", fl="ownerId,petName", sort="ownerId asc"),
  on="personId=ownerId"
)

innerJoin(
  search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
  select(
    search(pets, q="type:cat", qt="/export", fl="ownerId,name", sort="ownerId asc"),
    ownerId,
    name as petName
  ),
  on="personId=ownerId"
)

intersect

The intersect function wraps two streams, A and B, and emits tuples from A which DO exist in B. The tuples are emitted in the order in which they appear in stream A. Both streams must be sorted by the fields being used to determine equality (the on parameter). Only tuples from A are emitted.

intersect Parameters

  • StreamExpression for StreamA

  • StreamExpression for StreamB

  • on: Fields to be used for checking equality of tuples between A and B. Can be of the format on="fieldName", on="fieldNameInLeft=fieldNameInRight", or on="fieldName, otherFieldName=rightOtherFieldName".

intersect Syntax

intersect(
  search(collection1, q="a_s:(setA || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
  search(collection1, q="a_s:(setB || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc"),
  on="a_i"
)

intersect(
  search(collection1, q="a_s:(setA || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
  search(collection1, q="a_s:(setB || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
  on="a_i,a_s"
)

list

The list function wraps N Stream Expressions and opens and iterates each stream sequentially. This has the effect of concatenating the results of multiple Streaming Expressions.

list Parameters

  • StreamExpressions …​: N Streaming Expressions

list Syntax

list(tuple(a="hello world"), tuple(a="HELLO WORLD"))

list(search(collection1, q="*:*", fl="id, prod_ss", sort="id asc"),
     search(collection2, q="*:*", fl="id, prod_ss", sort="id asc"))

list(tuple(a=search(collection1, q="*:*", fl="id, prod_ss", sort="id asc")),
     tuple(a=search(collection2, q="*:*", fl="id, prod_ss", sort="id asc")))

merge

The merge function merges two or more streaming expressions and maintains the ordering of the underlying streams. Because the order is maintained, the sorts of the underlying streams must line up with the on parameter provided to the merge function.

merge Parameters

  • StreamExpression A

  • StreamExpression B

  • Optional StreamExpression C,D,…​.Z

  • on: Sort criteria for performing the merge. Of the form fieldName order where order is asc or desc. Multiple fields can be provided in the form fieldA order, fieldB order.

merge Syntax

# Merging two stream expressions together
merge(
      search(collection1,
             q="id:(0 3 4)",
             qt="/export",
             fl="id,a_s,a_i,a_f",
             sort="a_f asc"),
      search(collection1,
             q="id:(1)",
             qt="/export",
             fl="id,a_s,a_i,a_f",
             sort="a_f asc"),
      on="a_f asc")
# Merging four stream expressions together. Notice that while the sorts of each stream are not identical they are
# comparable. That is to say the first N fields in each stream's sort matches the N fields in the merge's on clause.
merge(
      search(collection1,
             q="id:(0 3 4)",
             qt="/export",
             fl="id,fieldA,fieldB,fieldC",
             sort="fieldA asc, fieldB desc"),
      search(collection1,
             q="id:(1)",
             qt="/export",
             fl="id,fieldA",
             sort="fieldA asc"),
      search(collection2,
             q="id:(10 11 13)",
             qt="/export",
             fl="id,fieldA,fieldC",
             sort="fieldA asc"),
      search(collection3,
             q="id:(987)",
             qt="/export",
             fl="id,fieldA,fieldC",
             sort="fieldA asc"),
      on="fieldA asc")

null

The null expression is a useful utility function for understanding bottlenecks when performing parallel relational algebra (joins, intersections, rollups etc.). The null function reads all the tuples from an underlying stream and returns a single tuple with the count and processing time. Because the null stream adds minimal overhead of its own, it can be used to isolate the performance of Solr’s /export handler. If the /export handlers performance is not the bottleneck, then the bottleneck is likely occurring in the workers where the stream decorators are running.

The null expression can be wrapped by the parallel function and sent to worker nodes. In this scenario each worker will return one tuple with the count of tuples processed on the worker and the timing information for that worker. This gives valuable information such as:

  1. As more workers are added does the performance of the /export handler improve or not.
  2. Are tuples being evenly distributed across the workers, or is the hash partitioning sending more documents to a single worker.
  3. Are all workers processing data at the same speed, or is one of the workers the source of the bottleneck.

null Parameters

  • StreamExpression: (Mandatory) The expression read by the null function.

null Syntax

 parallel(workerCollection,
          null(search(collection1, q="*:*", fl="id,a_s,a_i,a_f", sort="a_s desc", qt="/export", partitionKeys="a_s")),
          workers="20",
          zkHost="localhost:9983",
          sort="a_s desc")

The expression above shows a parallel function wrapping a null function. This will cause the null function to be run in parallel across 20 worker nodes. Each worker will return a single tuple with number of tuples processed and time it took to iterate the tuples.

outerHashJoin

The outerHashJoin function wraps two streams, Left and Right, and emits tuples from Left. If there is a tuple in Right equal (as defined by the on parameter) then the values in that tuple will be included in the emitted tuple. An equal tuple in Right need not exist for the Left tuple to be emitted. This supports one-to-one, one-to-many, many-to-one, and many-to-many left outer join scenarios. The tuples are emitted in the order in which they appear in the Left stream. The order of the streams does not matter. If both tuples contain a field of the same name then the value from the Right stream will be used in the emitted tuple.

You can wrap the incoming streams with a select function to be specific about which field values are included in the emitted tuple.

The outerHashJoin stream can be used when the tuples of Left and Right cannot be put in the same order. Because the tuples are out of order, this stream functions by reading all values from the Right stream during the open operation and will store all tuples in memory. The result of this is a memory footprint equal to the size of the Right stream.

outerHashJoin Parameters

  • StreamExpression for StreamLeft

  • hashed=StreamExpression for StreamRight

  • on: Fields to be used for checking equality of tuples between Left and Right. Can be of the format on="fieldName", on="fieldNameInLeft=fieldNameInRight", or on="fieldName, otherFieldName=rightOtherFieldName".

outerHashJoin Syntax

outerHashJoin(
  search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
  hashed=search(pets, q="type:cat", qt="/export", fl="personId,petName", sort="personId asc"),
  on="personId"
)

outerHashJoin(
  search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
  hashed=search(pets, q="type:cat", qt="/export", fl="ownerId,petName", sort="ownerId asc"),
  on="personId=ownerId"
)

outerHashJoin(
  search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
  hashed=select(
    search(pets, q="type:cat", qt="/export", fl="ownerId,name", sort="ownerId asc"),
    ownerId,
    name as petName
  ),
  on="personId=ownerId"
)

parallel

The parallel function wraps a streaming expression and sends it to N worker nodes to be processed in parallel.

The parallel function requires that the partitionKeys parameter be provided to the underlying searches. The partitionKeys parameter will partition the search results (tuples) across the worker nodes. Tuples with the same values as partitionKeys will be shuffled to the same worker nodes.

The parallel function maintains the sort order of the tuples returned by the worker nodes, so the sort criteria must incorporate the sort order of the tuples returned by the workers.

For example if you sort on year, month and day you could partition on year only as long as there are enough different years to spread the tuples around the worker nodes.

Solr allows sorting on more than 4 fields, but you cannot specify more than 4 partitionKeys for speed considerations. Also it’s overkill to specify many partitionKeys when we one or two keys could be enough to spread the tuples.

Parallel stream was designed when the underlying search stream will emit a lot of tuples from the collection. If the search stream only emits a small subset of the data from the collection using parallel could potentially be slower.

Worker Collections

The worker nodes can be from the same collection as the data, or they can be a different collection entirely, even one that only exists for parallel streaming expressions. A worker collection can be any SolrCloud collection that has the /stream handler configured. Unlike normal SolrCloud collections, worker collections don’t have to hold any data. Worker collections can be empty collections that exist only to execute streaming expressions.

parallel Parameters

  • collection: Name of the worker collection to send the StreamExpression to.

  • StreamExpression: Expression to send to the worker collection.

  • workers: Number of workers in the worker collection to send the expression to.

  • zkHost: (Optional) The ZooKeeper connect string where the worker collection resides.

  • sort: The sort criteria for ordering tuples returned by the worker nodes.

parallel Syntax

 parallel(workerCollection,
          rollup(search(collection1, q="*:*", fl="id,year_i,month_i,day_i", qt="/export", sort="year_i desc,month_i desc,day_i asc", partitionKeys="year_i"),
                 over="year_i", count(*)),
          workers="20",
          zkHost="localhost:9983",
          sort="year_i desc")

The expression above shows a parallel function wrapping a rollup function. This will cause the rollup function to be run in parallel across 20 worker nodes.

Warmup

The parallel function uses the hash query parser to split the data amongst the workers. It executes on all the documents and the result bitset is cached in the filterCache.

+ For a parallel stream with the same number of workers and partitonKeys the first query would be slower than subsequent queries. A trick to not pay the penalty for the first slow query would be to use a warmup query for every new searcher. The following is a solrconfig.xml snippet for 2 workers and "year_i" as the partionKeys.

<listener event="newSearcher" class="solr.QuerySenderListener">
<arr name="queries">
    <lst><str name="q">:</str><str name="fq">{!hash workers=2 worker=0}</str><str name="partitionKeys">year_i</str></lst>
    <lst><str name="q">:</str><str name="fq">{!hash workers=2 worker=1}</str><str name="partitionKeys">year_i</str></lst>
</arr>
</listener>

plist

The plist function wraps N Stream Expressions and opens the streams in parallel and iterates each stream sequentially. The difference between the list and plist is that the streams are opened in parallel. Since many streams such as facet, stats and significantTerms push down heavy operations to Solr when they are opened, the plist function can dramatically improve performance by doing these operations in parallel.

plist Parameters

  • StreamExpressions …​: N Streaming Expressions

plist Syntax

plist(tuple(a="hello world"), tuple(a="HELLO WORLD"))

plist(search(collection1, q="*:*", fl="id, prod_ss", sort="id asc"),
      search(collection2, q="*:*", fl="id, prod_ss", sort="id asc"))

plist(tuple(a=search(collection1, q="*:*", fl="id, prod_ss", sort="id asc")),
      tuple(a=search(collection2, q="*:*", fl="id, prod_ss", sort="id asc")))

priority

The priority function is a simple priority scheduler for the executor function. The executor function doesn’t directly have a concept of task prioritization; instead it simply executes tasks in the order that they are read from its underlying stream. The priority function provides the ability to schedule a higher priority task ahead of lower priority tasks that were submitted earlier.

The priority function wraps two topics that are both emitting tuples that contain streaming expressions to execute. The first topic is considered the higher priority task queue.

Each time the priority function is called, it checks the higher priority task queue to see if there are any tasks to execute. If tasks are waiting in the higher priority queue then the priority function will emit the higher priority tasks. If there are no high priority tasks to run, the lower priority queue tasks are emitted.

The priority function will only emit a batch of tasks from one of the queues each time it is called. This ensures that no lower priority tasks are executed until the higher priority queue has no tasks to run.

priority Parameters

  • topic expression: (Mandatory) the high priority task queue

  • topic expression: (Mandatory) the lower priority task queue

priority Syntax

daemon(id="myDaemon",
       executor(threads=10,
                priority(topic(checkpointCollection, storedExpressions, q="priority:high", fl="id, expr_s", initialCheckPoint=0,id="highPriorityTasks"),
                         topic(checkpointCollection, storedExpressions, q="priority:low", fl="id, expr_s", initialCheckPoint=0,id="lowPriorityTasks"))))

In the example above the daemon function is calling the executor iteratively. Each time it’s called, the executor function will execute the tasks emitted by the priority function. The priority function wraps two topics. The first topic is the higher priority task queue, the second topics is the lower priority topic.

reduce

The reduce function wraps an internal stream and groups tuples by common fields.

Each tuple group is operated on as a single block by a pluggable reduce operation. The group operation provided with Solr implements distributed grouping functionality. The group operation also serves as an example reduce operation that can be referred to when building custom reduce operations.

The reduce function relies on the sort order of the underlying stream. Accordingly the sort order of the underlying stream must be aligned with the group by field.

reduce Parameters

  • StreamExpression: (Mandatory)

  • by: (Mandatory) A comma separated list of fields to group by.

  • Reduce Operation: (Mandatory)

reduce Syntax

reduce(search(collection1, q="*:*", qt="/export", fl="id,a_s,a_i,a_f", sort="a_s asc, a_f asc"),
       by="a_s",
       group(sort="a_f desc", n="4")
)

rollup

The rollup function wraps another stream function and rolls up aggregates over bucket fields. The rollup function relies on the sort order of the underlying stream to rollup aggregates one grouping at a time. Accordingly, the sort order of the underlying stream must match the fields in the over parameter of the rollup function.

The rollup function also needs to process entire result sets in order to perform its aggregations. When the underlying stream is the search function, the /export handler can be used to provide full sorted result sets to the rollup function. This sorted approach allows the rollup function to perform aggregations over very high cardinality fields. The disadvantage of this approach is that the tuples must be sorted and streamed across the network to a worker node to be aggregated. For faster aggregation over low to moderate cardinality fields, the facet function can be used.

rollup Parameters

  • StreamExpression (Mandatory)

  • over: (Mandatory) A list of fields to group by.

  • metrics: (Mandatory) The list of metrics to compute. Currently supported metrics are sum(col), avg(col), min(col), max(col), count(*).

rollup Syntax

rollup(
   search(collection1, q="*:*", qt="/export", fl="a_s,a_i,a_f", qt="/export", sort="a_s asc"),
   over="a_s",
   sum(a_i),
   sum(a_f),
   min(a_i),
   min(a_f),
   max(a_i),
   max(a_f),
   avg(a_i),
   avg(a_f),
   count(*)
)

The example about shows the rollup function wrapping the search function. Notice that search function is using the /export handler to provide the entire result set to the rollup stream. Also notice that the search function’s sort parameter matches up with the rollup’s over parameter. This allows the rollup function to rollup the over the a_s field, one group at a time.

scoreNodes

See section in graph traversal.

select

The select function wraps a streaming expression and outputs tuples containing a subset or modified set of fields from the incoming tuples. The list of fields included in the output tuple can contain aliases to effectively rename fields. The select stream supports both operations and evaluators. One can provide a list of operations and evaluators to perform on any fields, such as replace, add, if, etc.

select Parameters

  • StreamExpression

  • fieldName: name of field to include in the output tuple (can include multiple of these), such as outputTuple[fieldName] = inputTuple[fieldName]

  • fieldName as aliasFieldName: aliased field name to include in the output tuple (can include multiple of these), such as outputTuple[aliasFieldName] = incomingTuple[fieldName]

  • replace(fieldName, value, withValue=replacementValue): if incomingTuple[fieldName] == value then outgoingTuple[fieldName] will be set to replacementValue. value can be the string "null" to replace a null value with some other value.

  • replace(fieldName, value, withField=otherFieldName): if incomingTuple[fieldName] == value then outgoingTuple[fieldName] will be set to the value of incomingTuple[otherFieldName]. value can be the string "null" to replace a null value with some other value.

select Syntax

// output tuples with fields teamName, wins, losses, and winPercentages where a null value for wins or losses is translated to the value of 0
select(
  search(collection1, fl="id,teamName_s,wins,losses", q="*:*", qt="/export", sort="id asc"),
  teamName_s as teamName,
  wins,
  losses,
  replace(wins,null,withValue=0),
  replace(losses,null,withValue=0),
  if(eq(0,wins), 0, div(add(wins,losses), wins)) as winPercentage
)

sort

The sort function wraps a streaming expression and re-orders the tuples. The sort function emits all incoming tuples in the new sort order. The sort function reads all tuples from the incoming stream, re-orders them using an algorithm with O(nlog(n)) performance characteristics, where n is the total number of tuples in the incoming stream, and then outputs the tuples in the new sort order. Because all tuples are read into memory, the memory consumption of this function grows linearly with the number of tuples in the incoming stream.

sort Parameters

  • StreamExpression

  • by: Sort criteria for re-ordering the tuples

sort Syntax

The expression below finds dog owners and orders the results by owner and pet name. Notice that it uses an efficient innerJoin by first ordering by the person/owner id and then re-orders the final output by the owner and pet names.

sort(
  innerJoin(
    search(people, q="*:*", qt="/export", fl="id,name", sort="id asc"),
    search(pets, q="type:dog", qt="/export", fl="owner,petName", sort="owner asc"),
    on="id=owner"
  ),
  by="name asc, petName asc"
)

top

The top function wraps a streaming expression and re-orders the tuples. The top function emits only the top N tuples in the new sort order. The top function re-orders the underlying stream so the sort criteria does not have to match up with the underlying stream.

top Parameters

  • n: Number of top tuples to return.

  • StreamExpression

  • sort: Sort criteria for selecting the top N tuples.

top Syntax

The expression below finds the top 3 results of the underlying search. Notice that it reverses the sort order. The top function re-orders the results of the underlying stream.

top(n=3,
     search(collection1,
            q="*:*",
            qt="/export",
            fl="id,a_s,a_i,a_f",
            sort="a_f desc, a_i desc"),
      sort="a_f asc, a_i asc")

unique

The unique function wraps a streaming expression and emits a unique stream of tuples based on the over parameter. The unique function relies on the sort order of the underlying stream. The over parameter must match up with the sort order of the underlying stream.

The unique function implements a non-co-located unique algorithm. This means that records with the same unique over field do not need to be co-located on the same shard. When executed in the parallel, the partitionKeys parameter must be the same as the unique over field so that records with the same keys will be shuffled to the same worker.

unique Parameters

  • StreamExpression

  • over: The unique criteria.

unique Syntax

unique(
  search(collection1,
         q="*:*",
         qt="/export",
         fl="id,a_s,a_i,a_f",
         sort="a_f asc, a_i asc"),
  over="a_f")

update

The update function wraps another functions and sends the tuples to a SolrCloud collection for indexing as Documents.

update Parameters

  • destinationCollection: (Mandatory) The collection where the tuples will indexed.

  • batchSize: (Mandatory) The indexing batch size.

  • pruneVersionField: (Optional, defaults to true) Wether to prune _version_ values from tuples

  • StreamExpression: (Mandatory)

update Syntax

 update(destinationCollection,
        batchSize=500,
        search(collection1,
               q=*:*,
               qt="/export",
               fl="id,a_s,a_i,a_f,s_multi,i_multi",
               sort="a_f asc, a_i asc"))

The example above sends the tuples returned by the search function to the destinationCollection to be indexed.

Wrapping search(…​) as showing in this example is the common case usage of this decorator: to read documents from a collection as tuples, process or modify them in some way, and then add them back to a new collection. For this reason, pruneVersionField=true is the default behavior — stripping any _version_ values found in the inner stream when converting the tuples to Solr documents to prevent any unexpected errors from Optimistic concurrency constraints.