<?php
namespace Elastica;
use Elastica\Bulk\Action;
use Elastica\Exception\ConnectionException;
use Elastica\Exception\InvalidException;
use Elastica\Script\AbstractScript;
use Elasticsearch\Endpoints\AbstractEndpoint;
use Elasticsearch\Endpoints\Indices\ForceMerge;
use Elasticsearch\Endpoints\Indices\Refresh;
use Elasticsearch\Endpoints\Update;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
/**
* Client to connect the the elasticsearch server.
*
* @author Nicolas Ruflin <spam@ruflin.com>
*/
class Client
{
/**
* Config with defaults.
*
* log: Set to true, to enable logging, set a string to log to a specific file
* retryOnConflict: Use in \Elastica\Client::updateDocument
* bigintConversion: Set to true to enable the JSON bigint to string conversion option (see issue #717)
*
* @var array
*/
protected $_config = [
'host' => null,
'port' => null,
'path' => null,
'url' => null,
'proxy' => null,
'transport' => null,
'persistent' => true,
'timeout' => null,
'connections' => [], // host, port, path, timeout, transport, compression, persistent, timeout, username, password, config -> (curl, headers, url)
'roundRobin' => false,
'log' => false,
'retryOnConflict' => 0,
'bigintConversion' => false,
'username' => null,
'password' => null,
];
/**
* @var callback
*/
protected $_callback;
/**
* @var Connection\ConnectionPool
*/
protected $_connectionPool;
/**
* @var \Elastica\Request|null
*/
protected $_lastRequest;
/**
* @var \Elastica\Response|null
*/
protected $_lastResponse;
/**
* @var LoggerInterface
*/
protected $_logger;
/**
* @var string
*/
protected $_version;
/**
* Creates a new Elastica client.
*
* @param array $config OPTIONAL Additional config options
* @param callback $callback OPTIONAL Callback function which can be used to be notified about errors (for example connection down)
* @param LoggerInterface $logger
*/
public function __construct(array $config = [], $callback = null, LoggerInterface $logger = null)
{
$this->_callback = $callback;
if (!$logger && isset($config['log']) && $config['log']) {
$logger = new Log($config['log']);
}
$this->_logger = $logger ?: new NullLogger();
$this->setConfig($config);
$this->_initConnections();
}
/**
* Get current version.
*
* @return string
*/
public function getVersion()
{
if ($this->_version) {
return $this->_version;
}
$data = $this->request('/')->getData();
return $this->_version = $data['version']['number'];
}
/**
* Inits the client connections.
*/
protected function _initConnections()
{
$connections = [];
foreach ($this->getConfig('connections') as $connection) {
$connections[] = Connection::create($this->_prepareConnectionParams($connection));
}
if (isset($this->_config['servers'])) {
foreach ($this->getConfig('servers') as $server) {
$connections[] = Connection::create($this->_prepareConnectionParams($server));
}
}
// If no connections set, create default connection
if (empty($connections)) {
$connections[] = Connection::create($this->_prepareConnectionParams($this->getConfig()));
}
if (!isset($this->_config['connectionStrategy'])) {
if (true === $this->getConfig('roundRobin')) {
$this->setConfigValue('connectionStrategy', 'RoundRobin');
} else {
$this->setConfigValue('connectionStrategy', 'Simple');
}
}
$strategy = Connection\Strategy\StrategyFactory::create($this->getConfig('connectionStrategy'));
$this->_connectionPool = new Connection\ConnectionPool($connections, $strategy, $this->_callback);
}
/**
* Creates a Connection params array from a Client or server config array.
*
* @param array $config
*
* @return array
*/
protected function _prepareConnectionParams(array $config)
{
$params = [];
$params['config'] = [];
foreach ($config as $key => $value) {
if (in_array($key, ['bigintConversion', 'curl', 'headers', 'url'])) {
$params['config'][$key] = $value;
} else {
$params[$key] = $value;
}
}
return $params;
}
/**
* Sets specific config values (updates and keeps default values).
*
* @param array $config Params
*
* @return $this
*/
public function setConfig(array $config)
{
foreach ($config as $key => $value) {
$this->_config[$key] = $value;
}
return $this;
}
/**
* Returns a specific config key or the whole
* config array if not set.
*
* @param string $key Config key
*
* @throws \Elastica\Exception\InvalidException
*
* @return array|string Config value
*/
public function getConfig($key = '')
{
if (empty($key)) {
return $this->_config;
}
if (!array_key_exists($key, $this->_config)) {
throw new InvalidException('Config key is not set: '.$key);
}
return $this->_config[$key];
}
/**
* Sets / overwrites a specific config value.
*
* @param string $key Key to set
* @param mixed $value Value
*
* @return $this
*/
public function setConfigValue($key, $value)
{
return $this->setConfig([$key => $value]);
}
/**
* @param array|string $keys config key or path of config keys
* @param mixed $default default value will be returned if key was not found
*
* @return mixed
*/
public function getConfigValue($keys, $default = null)
{
$value = $this->_config;
foreach ((array) $keys as $key) {
if (isset($value[$key])) {
$value = $value[$key];
} else {
return $default;
}
}
return $value;
}
/**
* Returns the index for the given connection.
*
* @param string $name Index name to create connection to
*
* @return \Elastica\Index Index for the given name
*/
public function getIndex($name)
{
return new Index($this, $name);
}
/**
* Adds a HTTP Header.
*
* @param string $header The HTTP Header
* @param string $headerValue The HTTP Header Value
*
* @throws \Elastica\Exception\InvalidException If $header or $headerValue is not a string
*
* @return $this
*/
public function addHeader($header, $headerValue)
{
if (is_string($header) && is_string($headerValue)) {
$this->_config['headers'][$header] = $headerValue;
} else {
throw new InvalidException('Header must be a string');
}
return $this;
}
/**
* Remove a HTTP Header.
*
* @param string $header The HTTP Header to remove
*
* @throws \Elastica\Exception\InvalidException If $header is not a string
*
* @return $this
*/
public function removeHeader($header)
{
if (is_string($header)) {
if (array_key_exists($header, $this->_config['headers'])) {
unset($this->_config['headers'][$header]);
}
} else {
throw new InvalidException('Header must be a string');
}
return $this;
}
/**
* Uses _bulk to send documents to the server.
*
* Array of \Elastica\Document as input. Index and type has to be
* set inside the document, because for bulk settings documents,
* documents can belong to any type and index
*
* @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
*
* @param array|\Elastica\Document[] $docs Array of Elastica\Document
* @param array $requestParams
*
* @throws \Elastica\Exception\InvalidException If docs is empty
*
* @return \Elastica\Bulk\ResponseSet Response object
*/
public function updateDocuments(array $docs, array $requestParams = [])
{
if (empty($docs)) {
throw new InvalidException('Array has to consist of at least one element');
}
$bulk = new Bulk($this);
$bulk->addDocuments($docs, Action::OP_TYPE_UPDATE);
foreach ($requestParams as $key => $value) {
$bulk->setRequestParam($key, $value);
}
return $bulk->send();
}
/**
* Uses _bulk to send documents to the server.
*
* Array of \Elastica\Document as input. Index and type has to be
* set inside the document, because for bulk settings documents,
* documents can belong to any type and index
*
* @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
*
* @param array|\Elastica\Document[] $docs Array of Elastica\Document
* @param array $requestParams
*
* @throws \Elastica\Exception\InvalidException If docs is empty
*
* @return \Elastica\Bulk\ResponseSet Response object
*/
public function addDocuments(array $docs, array $requestParams = [])
{
if (empty($docs)) {
throw new InvalidException('Array has to consist of at least one element');
}
$bulk = new Bulk($this);
$bulk->addDocuments($docs);
foreach ($requestParams as $key => $value) {
$bulk->setRequestParam($key, $value);
}
return $bulk->send();
}
/**
* Update document, using update script. Requires elasticsearch >= 0.19.0.
*
* @param int|string $id document id
* @param array|\Elastica\Script\AbstractScript|\Elastica\Document $data raw data for request body
* @param string $index index to update
* @param string $type type of index to update
* @param array $options array of query params to use for query. For possible options check es api
*
* @return \Elastica\Response
*
* @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html
*/
public function updateDocument($id, $data, $index, $type, array $options = [])
{
$endpoint = new Update();
$endpoint->setID($id);
$endpoint->setIndex($index);
$endpoint->setType($type);
if ($data instanceof AbstractScript) {
$requestData = $data->toArray();
} elseif ($data instanceof Document) {
$requestData = ['doc' => $data->getData()];
if ($data->getDocAsUpsert()) {
$requestData['doc_as_upsert'] = true;
}
$docOptions = $data->getOptions(
[
'version',
'version_type',
'routing',
'percolate',
'parent',
'fields',
'retry_on_conflict',
'consistency',
'replication',
'refresh',
'timeout',
]
);
$options += $docOptions;
// set fields param to source only if options was not set before
if ($data instanceof Document && ($data->isAutoPopulate()
|| $this->getConfigValue(['document', 'autoPopulate'], false))
&& !isset($options['fields'])
) {
$options['fields'] = '_source';
}
} else {
$requestData = $data;
}
//If an upsert document exists
if ($data instanceof AbstractScript || $data instanceof Document) {
if ($data->hasUpsert()) {
$requestData['upsert'] = $data->getUpsert()->getData();
}
}
if (!isset($options['retry_on_conflict'])) {
if ($retryOnConflict = $this->getConfig('retryOnConflict')) {
$options['retry_on_conflict'] = $retryOnConflict;
}
}
$endpoint->setBody($requestData);
$endpoint->setParams($options);
$response = $this->requestEndpoint($endpoint);
if ($response->isOk()
&& $data instanceof Document
&& ($data->isAutoPopulate() || $this->getConfigValue(['document', 'autoPopulate'], false))
) {
$responseData = $response->getData();
if (isset($responseData['_version'])) {
$data->setVersion($responseData['_version']);
}
if (isset($options['fields'])) {
$this->_populateDocumentFieldsFromResponse($response, $data, $options['fields']);
}
}
return $response;
}
/**
* @param \Elastica\Response $response
* @param \Elastica\Document $document
* @param string $fields Array of field names to be populated or '_source' if whole document data should be updated
*/
protected function _populateDocumentFieldsFromResponse(Response $response, Document $document, $fields)
{
$responseData = $response->getData();
if ('_source' == $fields) {
if (isset($responseData['get']['_source']) && is_array($responseData['get']['_source'])) {
$document->setData($responseData['get']['_source']);
}
} else {
$keys = explode(',', $fields);
$data = $document->getData();
foreach ($keys as $key) {
if (isset($responseData['get']['fields'][$key])) {
$data[$key] = $responseData['get']['fields'][$key];
} elseif (isset($data[$key])) {
unset($data[$key]);
}
}
$document->setData($data);
}
}
/**
* Bulk deletes documents.
*
* @param array|\Elastica\Document[] $docs
* @param array $requestParams
*
* @throws \Elastica\Exception\InvalidException
*
* @return \Elastica\Bulk\ResponseSet
*/
public function deleteDocuments(array $docs, array $requestParams = [])
{
if (empty($docs)) {
throw new InvalidException('Array has to consist of at least one element');
}
$bulk = new Bulk($this);
$bulk->addDocuments($docs, Action::OP_TYPE_DELETE);
foreach ($requestParams as $key => $value) {
$bulk->setRequestParam($key, $value);
}
return $bulk->send();
}
/**
* Returns the status object for all indices.
*
* @return \Elastica\Status Status object
*/
public function getStatus()
{
return new Status($this);
}
/**
* Returns the current cluster.
*
* @return \Elastica\Cluster Cluster object
*/
public function getCluster()
{
return new Cluster($this);
}
/**
* Establishes the client connections.
*/
public function connect()
{
return $this->_initConnections();
}
/**
* @param \Elastica\Connection $connection
*
* @return $this
*/
public function addConnection(Connection $connection)
{
$this->_connectionPool->addConnection($connection);
return $this;
}
/**
* Determines whether a valid connection is available for use.
*
* @return bool
*/
public function hasConnection()
{
return $this->_connectionPool->hasConnection();
}
/**
* @throws \Elastica\Exception\ClientException
*
* @return \Elastica\Connection
*/
public function getConnection()
{
return $this->_connectionPool->getConnection();
}
/**
* @return \Elastica\Connection[]
*/
public function getConnections()
{
return $this->_connectionPool->getConnections();
}
/**
* @return \Elastica\Connection\Strategy\StrategyInterface
*/
public function getConnectionStrategy()
{
return $this->_connectionPool->getStrategy();
}
/**
* @param array|\Elastica\Connection[] $connections
*
* @return $this
*/
public function setConnections(array $connections)
{
$this->_connectionPool->setConnections($connections);
return $this;
}
/**
* Deletes documents with the given ids, index, type from the index.
*
* @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
*
* @param array $ids Document ids
* @param string|\Elastica\Index $index Index name
* @param string|\Elastica\Type $type Type of documents
* @param string|bool $routing Optional routing key for all ids
*
* @throws \Elastica\Exception\InvalidException
*
* @return \Elastica\Bulk\ResponseSet Response object
*/
public function deleteIds(array $ids, $index, $type, $routing = false)
{
if (empty($ids)) {
throw new InvalidException('Array has to consist of at least one id');
}
$bulk = new Bulk($this);
$bulk->setIndex($index);
$bulk->setType($type);
foreach ($ids as $id) {
$action = new Action(Action::OP_TYPE_DELETE);
$action->setId($id);
if (!empty($routing)) {
$action->setRouting($routing);
}
$bulk->addAction($action);
}
return $bulk->send();
}
/**
* Bulk operation.
*
* Every entry in the params array has to exactly on array
* of the bulk operation. An example param array would be:
*
* array(
* array('index' => array('_index' => 'test', '_type' => 'user', '_id' => '1')),
* array('user' => array('name' => 'hans')),
* array('delete' => array('_index' => 'test', '_type' => 'user', '_id' => '2'))
* );
*
* @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
*
* @param array $params Parameter array
*
* @throws \Elastica\Exception\ResponseException
* @throws \Elastica\Exception\InvalidException
*
* @return \Elastica\Bulk\ResponseSet Response object
*/
public function bulk(array $params)
{
if (empty($params)) {
throw new InvalidException('Array has to consist of at least one param');
}
$bulk = new Bulk($this);
$bulk->addRawData($params);
return $bulk->send();
}
/**
* Makes calls to the elasticsearch server based on this index.
*
* It's possible to make any REST query directly over this method
*
* @param string $path Path to call
* @param string $method Rest method to use (GET, POST, DELETE, PUT)
* @param array|string $data OPTIONAL Arguments as array or pre-encoded string
* @param array $query OPTIONAL Query params
* @param string $contentType Content-Type sent with this request
*
* @throws Exception\ConnectionException|Exception\ClientException
*
* @return Response Response object
*/
public function request($path, $method = Request::GET, $data = [], array $query = [], $contentType = Request::DEFAULT_CONTENT_TYPE)
{
$connection = $this->getConnection();
$request = $this->_lastRequest = new Request($path, $method, $data, $query, $connection, $contentType);
$this->_lastResponse = null;
try {
$response = $this->_lastResponse = $request->send();
} catch (ConnectionException $e) {
$this->_connectionPool->onFail($connection, $e, $this);
$this->_log($e);
// In case there is no valid connection left, throw exception which caused the disabling of the connection.
if (!$this->hasConnection()) {
throw $e;
}
return $this->request($path, $method, $data, $query);
}
$this->_log($request);
return $response;
}
/**
* Makes calls to the elasticsearch server with usage official client Endpoint.
*
* @param AbstractEndpoint $endpoint
*
* @return Response
*/
public function requestEndpoint(AbstractEndpoint $endpoint)
{
return $this->request(
ltrim($endpoint->getURI(), '/'),
$endpoint->getMethod(),
null === $endpoint->getBody() ? [] : $endpoint->getBody(),
$endpoint->getParams()
);
}
/**
* logging.
*
* @deprecated Overwriting Client->_log is deprecated. Handle logging functionality by using a custom LoggerInterface.
*
* @param mixed $context
*/
protected function _log($context)
{
if ($context instanceof ConnectionException) {
$this->_logger->error('Elastica Request Failure', [
'exception' => $context,
'request' => $context->getRequest()->toArray(),
'retry' => $this->hasConnection(),
]);
return;
}
if ($context instanceof Request) {
$this->_logger->debug('Elastica Request', [
'request' => $context->toArray(),
'response' => $this->_lastResponse ? $this->_lastResponse->getData() : null,
'responseStatus' => $this->_lastResponse ? $this->_lastResponse->getStatus() : null,
]);
return;
}
$this->_logger->debug('Elastica Request', [
'message' => $context,
]);
}
/**
* Optimizes all search indices.
*
* @param array $args OPTIONAL Optional arguments
*
* @return \Elastica\Response Response object
*
* @deprecated Replaced by forcemergeAll
* @see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-optimize.html
*/
public function optimizeAll($args = [])
{
trigger_error('Deprecated: Elastica\Client::optimizeAll() is deprecated and will be removed in further Elastica releases. Use Elastica\Client::forcemergeAll() instead.', E_USER_DEPRECATED);
return $this->forcemergeAll($args);
}
/**
* Force merges all search indices.
*
* @param array $args OPTIONAL Optional arguments
*
* @return \Elastica\Response Response object
*
* @see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-forcemerge.html
*/
public function forcemergeAll($args = [])
{
$endpoint = new ForceMerge();
$endpoint->setParams($args);
return $this->requestEndpoint($endpoint);
}
/**
* Refreshes all search indices.
*
* @return \Elastica\Response Response object
*
* @see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-refresh.html
*/
public function refreshAll()
{
return $this->requestEndpoint(new Refresh());
}
/**
* @return Request|null
*/
public function getLastRequest()
{
return $this->_lastRequest;
}
/**
* @return Response|null
*/
public function getLastResponse()
{
return $this->_lastResponse;
}
/**
* Replace the existing logger.
*
* @param LoggerInterface $logger
*
* @return $this
*/
public function setLogger(LoggerInterface $logger)
{
$this->_logger = $logger;
return $this;
}
}