vendor/ruflin/elastica/lib/Elastica/Bulk.php line 360

Open in your IDE?
  1. <?php
  2. namespace Elastica;
  3. use Elastica\Bulk\Action;
  4. use Elastica\Bulk\Action\AbstractDocument as AbstractDocumentAction;
  5. use Elastica\Bulk\Response as BulkResponse;
  6. use Elastica\Bulk\ResponseSet;
  7. use Elastica\Exception\Bulk\ResponseException as BulkResponseException;
  8. use Elastica\Exception\InvalidException;
  9. use Elastica\Script\AbstractScript;
  10. class Bulk
  11. {
  12.     const DELIMITER "\n";
  13.     /**
  14.      * @var \Elastica\Client
  15.      */
  16.     protected $_client;
  17.     /**
  18.      * @var \Elastica\Bulk\Action[]
  19.      */
  20.     protected $_actions = [];
  21.     /**
  22.      * @var string|null
  23.      */
  24.     protected $_index;
  25.     /**
  26.      * @var string|null
  27.      */
  28.     protected $_type;
  29.     /**
  30.      * @var array request parameters to the bulk api
  31.      */
  32.     protected $_requestParams = [];
  33.     /**
  34.      * @param \Elastica\Client $client
  35.      */
  36.     public function __construct(Client $client)
  37.     {
  38.         $this->_client $client;
  39.     }
  40.     /**
  41.      * @param string|\Elastica\Index $index
  42.      *
  43.      * @return $this
  44.      */
  45.     public function setIndex($index)
  46.     {
  47.         if ($index instanceof Index) {
  48.             $index $index->getName();
  49.         }
  50.         $this->_index = (string) $index;
  51.         return $this;
  52.     }
  53.     /**
  54.      * @return string|null
  55.      */
  56.     public function getIndex()
  57.     {
  58.         return $this->_index;
  59.     }
  60.     /**
  61.      * @return bool
  62.      */
  63.     public function hasIndex()
  64.     {
  65.         return null !== $this->getIndex() && '' !== $this->getIndex();
  66.     }
  67.     /**
  68.      * @param string|\Elastica\Type $type
  69.      *
  70.      * @return $this
  71.      */
  72.     public function setType($type)
  73.     {
  74.         if ($type instanceof Type) {
  75.             $this->setIndex($type->getIndex()->getName());
  76.             $type $type->getName();
  77.         }
  78.         $this->_type = (string) $type;
  79.         return $this;
  80.     }
  81.     /**
  82.      * @return string|null
  83.      */
  84.     public function getType()
  85.     {
  86.         return $this->_type;
  87.     }
  88.     /**
  89.      * @return bool
  90.      */
  91.     public function hasType()
  92.     {
  93.         return null !== $this->getType() && '' !== $this->getType();
  94.     }
  95.     /**
  96.      * @return string
  97.      */
  98.     public function getPath()
  99.     {
  100.         $path '';
  101.         if ($this->hasIndex()) {
  102.             $path .= $this->getIndex().'/';
  103.             if ($this->hasType()) {
  104.                 $path .= $this->getType().'/';
  105.             }
  106.         }
  107.         $path .= '_bulk';
  108.         return $path;
  109.     }
  110.     /**
  111.      * @param \Elastica\Bulk\Action $action
  112.      *
  113.      * @return $this
  114.      */
  115.     public function addAction(Action $action)
  116.     {
  117.         $this->_actions[] = $action;
  118.         return $this;
  119.     }
  120.     /**
  121.      * @param \Elastica\Bulk\Action[] $actions
  122.      *
  123.      * @return $this
  124.      */
  125.     public function addActions(array $actions)
  126.     {
  127.         foreach ($actions as $action) {
  128.             $this->addAction($action);
  129.         }
  130.         return $this;
  131.     }
  132.     /**
  133.      * @return \Elastica\Bulk\Action[]
  134.      */
  135.     public function getActions()
  136.     {
  137.         return $this->_actions;
  138.     }
  139.     /**
  140.      * @param \Elastica\Document $document
  141.      * @param string             $opType
  142.      *
  143.      * @return $this
  144.      */
  145.     public function addDocument(Document $document$opType null)
  146.     {
  147.         $action AbstractDocumentAction::create($document$opType);
  148.         return $this->addAction($action);
  149.     }
  150.     /**
  151.      * @param \Elastica\Document[] $documents
  152.      * @param string               $opType
  153.      *
  154.      * @return $this
  155.      */
  156.     public function addDocuments(array $documents$opType null)
  157.     {
  158.         foreach ($documents as $document) {
  159.             $this->addDocument($document$opType);
  160.         }
  161.         return $this;
  162.     }
  163.     /**
  164.      * @param \Elastica\Script\AbstractScript $script
  165.      * @param string                          $opType
  166.      *
  167.      * @return $this
  168.      */
  169.     public function addScript(AbstractScript $script$opType null)
  170.     {
  171.         $action AbstractDocumentAction::create($script$opType);
  172.         return $this->addAction($action);
  173.     }
  174.     /**
  175.      * @param \Elastica\Document[] $scripts
  176.      * @param string               $opType
  177.      *
  178.      * @return $this
  179.      */
  180.     public function addScripts(array $scripts$opType null)
  181.     {
  182.         foreach ($scripts as $document) {
  183.             $this->addScript($document$opType);
  184.         }
  185.         return $this;
  186.     }
  187.     /**
  188.      * @param \Elastica\Script\AbstractScript|\Elastica\Document|array $data
  189.      * @param string                                                   $opType
  190.      *
  191.      * @return $this
  192.      */
  193.     public function addData($data$opType null)
  194.     {
  195.         if (!is_array($data)) {
  196.             $data = [$data];
  197.         }
  198.         foreach ($data as $actionData) {
  199.             if ($actionData instanceof AbstractScript) {
  200.                 $this->addScript($actionData$opType);
  201.             } elseif ($actionData instanceof Document) {
  202.                 $this->addDocument($actionData$opType);
  203.             } else {
  204.                 throw new \InvalidArgumentException('Data should be a Document, a Script or an array containing Documents and/or Scripts');
  205.             }
  206.         }
  207.         return $this;
  208.     }
  209.     /**
  210.      * @param array $data
  211.      *
  212.      * @throws \Elastica\Exception\InvalidException
  213.      *
  214.      * @return $this
  215.      */
  216.     public function addRawData(array $data)
  217.     {
  218.         foreach ($data as $row) {
  219.             if (is_array($row)) {
  220.                 $opType key($row);
  221.                 $metadata reset($row);
  222.                 if (Action::isValidOpType($opType)) {
  223.                     // add previous action
  224.                     if (isset($action)) {
  225.                         $this->addAction($action);
  226.                     }
  227.                     $action = new Action($opType$metadata);
  228.                 } elseif (isset($action)) {
  229.                     $action->setSource($row);
  230.                     $this->addAction($action);
  231.                     $action null;
  232.                 } else {
  233.                     throw new InvalidException('Invalid bulk data, source must follow action metadata');
  234.                 }
  235.             } else {
  236.                 throw new InvalidException('Invalid bulk data, should be array of array, Document or Bulk/Action');
  237.             }
  238.         }
  239.         // add last action if available
  240.         if (isset($action)) {
  241.             $this->addAction($action);
  242.         }
  243.         return $this;
  244.     }
  245.     /**
  246.      * Set a url parameter on the request bulk request.
  247.      *
  248.      * @param string $name  name of the parameter
  249.      * @param string $value value of the parameter
  250.      *
  251.      * @return $this
  252.      */
  253.     public function setRequestParam($name$value)
  254.     {
  255.         $this->_requestParams[$name] = $value;
  256.         return $this;
  257.     }
  258.     /**
  259.      * Set the amount of time that the request will wait the shards to come on line.
  260.      * Requires Elasticsearch version >= 0.90.8.
  261.      *
  262.      * @param string $time timeout in Elasticsearch time format
  263.      *
  264.      * @return $this
  265.      */
  266.     public function setShardTimeout($time)
  267.     {
  268.         return $this->setRequestParam('timeout'$time);
  269.     }
  270.     /**
  271.      * @return string
  272.      */
  273.     public function __toString()
  274.     {
  275.         return $this->toString();
  276.     }
  277.     /**
  278.      * @return string
  279.      */
  280.     public function toString()
  281.     {
  282.         $data '';
  283.         foreach ($this->getActions() as $action) {
  284.             $data .= $action->toString();
  285.         }
  286.         return $data;
  287.     }
  288.     /**
  289.      * @return array
  290.      */
  291.     public function toArray()
  292.     {
  293.         $data = [];
  294.         foreach ($this->getActions() as $action) {
  295.             foreach ($action->toArray() as $row) {
  296.                 $data[] = $row;
  297.             }
  298.         }
  299.         return $data;
  300.     }
  301.     /**
  302.      * @return \Elastica\Bulk\ResponseSet
  303.      */
  304.     public function send()
  305.     {
  306.         $path $this->getPath();
  307.         $data $this->toString();
  308.         $response $this->_client->request($pathRequest::POST$data$this->_requestParamsRequest::NDJSON_CONTENT_TYPE);
  309.         return $this->_processResponse($response);
  310.     }
  311.     /**
  312.      * @param \Elastica\Response $response
  313.      *
  314.      * @throws \Elastica\Exception\Bulk\ResponseException
  315.      * @throws \Elastica\Exception\InvalidException
  316.      *
  317.      * @return \Elastica\Bulk\ResponseSet
  318.      */
  319.     protected function _processResponse(Response $response)
  320.     {
  321.         $responseData $response->getData();
  322.         $actions $this->getActions();
  323.         $bulkResponses = [];
  324.         if (isset($responseData['items']) && is_array($responseData['items'])) {
  325.             foreach ($responseData['items'] as $key => $item) {
  326.                 if (!isset($actions[$key])) {
  327.                     throw new InvalidException('No response found for action #'.$key);
  328.                 }
  329.                 $action $actions[$key];
  330.                 $opType key($item);
  331.                 $bulkResponseData reset($item);
  332.                 if ($action instanceof AbstractDocumentAction) {
  333.                     $data $action->getData();
  334.                     if ($data instanceof Document && $data->isAutoPopulate()
  335.                         || $this->_client->getConfigValue(['document''autoPopulate'], false)
  336.                     ) {
  337.                         if (!$data->hasId() && isset($bulkResponseData['_id'])) {
  338.                             $data->setId($bulkResponseData['_id']);
  339.                         }
  340.                         if (isset($bulkResponseData['_version'])) {
  341.                             $data->setVersion($bulkResponseData['_version']);
  342.                         }
  343.                     }
  344.                 }
  345.                 $bulkResponses[] = new BulkResponse($bulkResponseData$action$opType);
  346.             }
  347.         }
  348.         $bulkResponseSet = new ResponseSet($response$bulkResponses);
  349.         if ($bulkResponseSet->hasError()) {
  350.             throw new BulkResponseException($bulkResponseSet);
  351.         }
  352.         return $bulkResponseSet;
  353.     }
  354. }