This commit is contained in:
2024-05-20 15:37:46 +03:00
commit 00b7dbd0b7
10404 changed files with 3285853 additions and 0 deletions

View File

@ -0,0 +1,66 @@
<?php
/**
* This a Base class which all Mixpanel classes extend from to provide some very basic
* debugging and logging functionality. It also serves to persist $_options across the library.
*
*/
class Base_MixpanelBase {
/**
* Default options that can be overridden via the $options constructor arg
* @var array
*/
private $_defaults = array(
"max_batch_size" => 50, // the max batch size Mixpanel will accept is 50,
"max_queue_size" => 1000, // the max num of items to hold in memory before flushing
"debug" => false, // enable/disable debug mode
"consumer" => "curl", // which consumer to use
"host" => "api.mixpanel.com", // the host name for api calls
"events_endpoint" => "/track", // host relative endpoint for events
"people_endpoint" => "/engage", // host relative endpoint for people updates
"groups_endpoint" => "/groups", // host relative endpoint for groups updates
"use_ssl" => true, // use ssl when available
"error_callback" => null // callback to use on consumption failures
);
/**
* An array of options to be used by the Mixpanel library.
* @var array
*/
protected $_options = array();
/**
* Construct a new MixpanelBase object and merge custom options with defaults
* @param array $options
*/
public function __construct($options = array()) {
$options = array_merge($this->_defaults, $options);
$this->_options = $options;
}
/**
* Log a message to PHP's error log
* @param $msg
*/
protected function _log($msg) {
$arr = debug_backtrace();
$class = $arr[0]['class'];
$line = $arr[0]['line'];
error_log ( "[ $class - line $line ] : " . $msg );
}
/**
* Returns true if in debug mode, false if in production mode
* @return bool
*/
protected function _debug() {
return isset($this->_options["debug"]) && $this->_options["debug"] == true;
}
}

View File

@ -0,0 +1,65 @@
<?php
require_once(dirname(__FILE__) . "/../Base/MixpanelBase.php");
/**
* Provides some base methods for use by a Consumer implementation
*/
abstract class ConsumerStrategies_AbstractConsumer extends Base_MixpanelBase {
/**
* Creates a new AbstractConsumer
* @param array $options
*/
function __construct($options = array()) {
parent::__construct($options);
if ($this->_debug()) {
$this->_log("Instantiated new Consumer");
}
}
/**
* Encode an array to be persisted
* @param array $params
* @return string
*/
protected function _encode($params) {
return base64_encode(json_encode($params));
}
/**
* Handles errors that occur in a consumer
* @param $code
* @param $msg
*/
protected function _handleError($code, $msg) {
if (isset($this->_options['error_callback'])) {
$handler = $this->_options['error_callback'];
call_user_func($handler, $code, $msg);
}
if ($this->_debug()) {
$arr = debug_backtrace();
$class = get_class($arr[0]['object']);
$line = $arr[0]['line'];
error_log ( "[ $class - line $line ] : " . print_r($msg, true) );
}
}
/**
* Number of requests/batches that will be processed in parallel.
* @return int
*/
public function getNumThreads() {
return 1;
}
/**
* Persist a batch of messages in whatever way the implementer sees fit
* @param array $batch an array of messages to consume
* @return boolean success or fail
*/
abstract function persist($batch);
}

View File

@ -0,0 +1,259 @@
<?php
require_once(dirname(__FILE__) . "/AbstractConsumer.php");
/**
* Consumes messages and sends them to a host/endpoint using cURL
*/
class ConsumerStrategies_CurlConsumer extends ConsumerStrategies_AbstractConsumer {
/**
* @var string the host to connect to (e.g. api.mixpanel.com)
*/
protected $_host;
/**
* @var string the host-relative endpoint to write to (e.g. /engage)
*/
protected $_endpoint;
/**
* @var int connect_timeout The number of seconds to wait while trying to connect. Default is 5 seconds.
*/
protected $_connect_timeout;
/**
* @var int timeout The maximum number of seconds to allow cURL call to execute. Default is 30 seconds.
*/
protected $_timeout;
/**
* @var string the protocol to use for the cURL connection
*/
protected $_protocol;
/**
* @var bool|null true to fork the cURL process (using exec) or false to use PHP's cURL extension. false by default
*/
protected $_fork = null;
/**
* @var int number of cURL requests to run in parallel. 1 by default
*/
protected $_num_threads;
/**
* Creates a new CurlConsumer and assigns properties from the $options array
* @param array $options
* @throws Exception
*/
function __construct($options) {
parent::__construct($options);
$this->_host = $options['host'];
$this->_endpoint = $options['endpoint'];
$this->_connect_timeout = isset($options['connect_timeout']) ? $options['connect_timeout'] : 5;
$this->_timeout = isset($options['timeout']) ? $options['timeout'] : 30;
$this->_protocol = isset($options['use_ssl']) && $options['use_ssl'] == true ? "https" : "http";
$this->_fork = isset($options['fork']) ? ($options['fork'] == true) : false;
$this->_num_threads = isset($options['num_threads']) ? max(1, intval($options['num_threads'])) : 1;
// ensure the environment is workable for the given settings
if ($this->_fork == true) {
$exists = function_exists('exec');
if (!$exists) {
throw new Exception('The "exec" function must exist to use the cURL consumer in "fork" mode. Try setting fork = false or use another consumer.');
}
$disabled = explode(', ', ini_get('disable_functions'));
$enabled = !in_array('exec', $disabled);
if (!$enabled) {
throw new Exception('The "exec" function must be enabled to use the cURL consumer in "fork" mode. Try setting fork = false or use another consumer.');
}
} else {
if (!function_exists('curl_init')) {
throw new Exception('The cURL PHP extension is required to use the cURL consumer with fork = false. Try setting fork = true or use another consumer.');
}
}
}
/**
* Write to the given host/endpoint using either a forked cURL process or using PHP's cURL extension
* @param array $batch
* @return bool
*/
public function persist($batch) {
if (count($batch) > 0) {
$url = $this->_protocol . "://" . $this->_host . $this->_endpoint;
if ($this->_fork) {
$data = "data=" . $this->_encode($batch);
return $this->_execute_forked($url, $data);
} else {
return $this->_execute($url, $batch);
}
} else {
return true;
}
}
/**
* Write using the cURL php extension
* @param $url
* @param $batch
* @return bool
*/
protected function _execute($url, $batch) {
if ($this->_debug()) {
$this->_log("Making blocking cURL call to $url");
}
$mh = curl_multi_init();
$chs = array();
$batch_size = ceil(count($batch) / $this->_num_threads);
for ($i=0; $i<$this->_num_threads && !empty($batch); $i++) {
$ch = curl_init();
$chs[] = $ch;
$data = "data=" . $this->_encode(array_splice($batch, 0, $batch_size));
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_HEADER, 0);
curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, $this->_connect_timeout);
curl_setopt($ch, CURLOPT_TIMEOUT, $this->_timeout);
curl_setopt($ch, CURLOPT_POST, 1);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($ch, CURLOPT_POSTFIELDS, $data);
curl_multi_add_handle($mh,$ch);
}
$running = 0;
do {
curl_multi_exec($mh, $running);
curl_multi_select($mh);
} while ($running > 0);
$info = curl_multi_info_read($mh);
$error = false;
foreach ($chs as $ch) {
$response = curl_multi_getcontent($ch);
if (false === $response) {
$this->_handleError(curl_errno($ch), curl_error($ch));
$error = true;
}
elseif ("1" != trim($response)) {
$this->_handleError(0, $response);
$error = true;
}
curl_multi_remove_handle($mh, $ch);
}
if (CURLE_OK != $info['result']) {
$this->_handleError($info['result'], "cURL error with code=".$info['result']);
$error = true;
}
curl_multi_close($mh);
return !$error;
}
/**
* Write using a forked cURL process
* @param $url
* @param $data
* @return bool
*/
protected function _execute_forked($url, $data) {
if ($this->_debug()) {
$this->_log("Making forked cURL call to $url");
}
$exec = 'curl -X POST -H "Content-Type: application/x-www-form-urlencoded" -d ' . $data . ' "' . $url . '"';
if(!$this->_debug()) {
$exec .= " >/dev/null 2>&1 &";
}
exec($exec, $output, $return_var);
if ($return_var != 0) {
$this->_handleError($return_var, $output);
}
return $return_var == 0;
}
/**
* @return int
*/
public function getConnectTimeout()
{
return $this->_connect_timeout;
}
/**
* @return string
*/
public function getEndpoint()
{
return $this->_endpoint;
}
/**
* @return bool|null
*/
public function getFork()
{
return $this->_fork;
}
/**
* @return string
*/
public function getHost()
{
return $this->_host;
}
/**
* @return array
*/
public function getOptions()
{
return $this->_options;
}
/**
* @return string
*/
public function getProtocol()
{
return $this->_protocol;
}
/**
* @return int
*/
public function getTimeout()
{
return $this->_timeout;
}
/**
* Number of requests/batches that will be processed in parallel using curl_multi_exec.
* @return int
*/
public function getNumThreads() {
return $this->_num_threads;
}
}

View File

@ -0,0 +1,38 @@
<?php
require_once(dirname(__FILE__) . "/AbstractConsumer.php");
/**
* Consumes messages and writes them to a file
*/
class ConsumerStrategies_FileConsumer extends ConsumerStrategies_AbstractConsumer {
/**
* @var string path to a file that we want to write the messages to
*/
private $_file;
/**
* Creates a new FileConsumer and assigns properties from the $options array
* @param array $options
*/
function __construct($options) {
parent::__construct($options);
// what file to write to?
$this->_file = isset($options['file']) ? $options['file'] : dirname(__FILE__)."/../../messages.txt";
}
/**
* Append $batch to a file
* @param array $batch
* @return bool
*/
public function persist($batch) {
if (count($batch) > 0) {
return file_put_contents($this->_file, json_encode($batch)."\n", FILE_APPEND | LOCK_EX) !== false;
} else {
return true;
}
}
}

View File

@ -0,0 +1,308 @@
<?php
/**
* Portions of this class were borrowed from
* https://github.com/segmentio/analytics-php/blob/master/lib/Analytics/Consumer/Socket.php.
* Thanks for the work!
*
* WWWWWW||WWWWWW
* W W W||W W W
* ||
* ( OO )__________
* / | \
* /o o| MIT \
* \___/||_||__||_|| *
* || || || ||
* _||_|| _||_||
* (__|__|(__|__|
* (The MIT License)
*
* Copyright (c) 2013 Segment.io Inc. friends@segment.io
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the 'Software'), to deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
* Software.
*
* THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
* WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS
* OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
require_once(dirname(__FILE__) . "/AbstractConsumer.php");
/**
* Consumes messages and writes them to host/endpoint using a persistent socket
*/
class ConsumerStrategies_SocketConsumer extends ConsumerStrategies_AbstractConsumer {
/**
* @var string the host to connect to (e.g. api.mixpanel.com)
*/
private $_host;
/**
* @var string the host-relative endpoint to write to (e.g. /engage)
*/
private $_endpoint;
/**
* @var int connect_timeout the socket connection timeout in seconds
*/
private $_connect_timeout;
/**
* @var string the protocol to use for the socket connection
*/
private $_protocol;
/**
* @var resource holds the socket resource
*/
private $_socket;
/**
* @var bool whether or not to wait for a response
*/
private $_async;
/**
* Creates a new SocketConsumer and assigns properties from the $options array
* @param array $options
*/
public function __construct($options = array()) {
parent::__construct($options);
$this->_host = $options['host'];
$this->_endpoint = $options['endpoint'];
$this->_connect_timeout = isset($options['connect_timeout']) ? $options['connect_timeout'] : 5;
$this->_async = isset($options['async']) && $options['async'] === false ? false : true;
if (array_key_exists('use_ssl', $options) && $options['use_ssl'] == true) {
$this->_protocol = "ssl";
$this->_port = 443;
} else {
$this->_protocol = "tcp";
$this->_port = 80;
}
}
/**
* Write using a persistent socket connection.
* @param array $batch
* @return bool
*/
public function persist($batch) {
$socket = $this->_getSocket();
if (!is_resource($socket)) {
return false;
}
$data = "data=".$this->_encode($batch);
$body = "";
$body.= "POST ".$this->_endpoint." HTTP/1.1\r\n";
$body.= "Host: " . $this->_host . "\r\n";
$body.= "Content-Type: application/x-www-form-urlencoded\r\n";
$body.= "Accept: application/json\r\n";
$body.= "Content-length: " . strlen($data) . "\r\n";
$body.= "\r\n";
$body.= $data;
return $this->_write($socket, $body);
}
/**
* Return cached socket if open or create a new persistent socket
* @return bool|resource
*/
private function _getSocket() {
if(is_resource($this->_socket)) {
if ($this->_debug()) {
$this->_log("Using existing socket");
}
return $this->_socket;
} else {
if ($this->_debug()) {
$this->_log("Creating new socket at ".time());
}
return $this->_createSocket();
}
}
/**
* Attempt to open a new socket connection, cache it, and return the resource
* @param bool $retry
* @return bool|resource
*/
private function _createSocket($retry = true) {
try {
$socket = pfsockopen($this->_protocol . "://" . $this->_host, $this->_port, $err_no, $err_msg, $this->_connect_timeout);
if ($this->_debug()) {
$this->_log("Opening socket connection to " . $this->_protocol . "://" . $this->_host . ":" . $this->_port);
}
if ($err_no != 0) {
$this->_handleError($err_no, $err_msg);
return $retry == true ? $this->_createSocket(false) : false;
} else {
// cache the socket
$this->_socket = $socket;
return $socket;
}
} catch (Exception $e) {
$this->_handleError($e->getCode(), $e->getMessage());
return $retry == true ? $this->_createSocket(false) : false;
}
}
/**
* Attempt to close and dereference a socket resource
*/
private function _destroySocket() {
$socket = $this->_socket;
$this->_socket = null;
fclose($socket);
}
/**
* Write $data through the given $socket
* @param $socket
* @param $data
* @param bool $retry
* @return bool
*/
private function _write($socket, $data, $retry = true) {
$bytes_sent = 0;
$bytes_total = strlen($data);
$socket_closed = false;
$success = true;
$max_bytes_per_write = 8192;
// if we have no data to write just return true
if ($bytes_total == 0) {
return true;
}
// try to write the data
while (!$socket_closed && $bytes_sent < $bytes_total) {
try {
$bytes = fwrite($socket, $data, $max_bytes_per_write);
if ($this->_debug()) {
$this->_log("Socket wrote ".$bytes." bytes");
}
// if we actually wrote data, then remove the written portion from $data left to write
if ($bytes > 0) {
$data = substr($data, $max_bytes_per_write);
}
} catch (Exception $e) {
$this->_handleError($e->getCode(), $e->getMessage());
$socket_closed = true;
}
if (isset($bytes) && $bytes) {
$bytes_sent += $bytes;
} else {
$socket_closed = true;
}
}
// create a new socket if the current one is closed and retry the message
if ($socket_closed) {
$this->_destroySocket();
if ($retry) {
if ($this->_debug()) {
$this->_log("Retrying socket write...");
}
$socket = $this->_getSocket();
if ($socket) return $this->_write($socket, $data, false);
}
return false;
}
// only wait for the response in debug mode or if we explicitly want to be synchronous
if ($this->_debug() || !$this->_async) {
$res = $this->handleResponse(fread($socket, 2048));
if ($res["status"] != "200") {
$this->_handleError($res["status"], $res["body"]);
$success = false;
}
}
return $success;
}
/**
* Parse the response from a socket write (only used for debugging)
* @param $response
* @return array
*/
private function handleResponse($response) {
$lines = explode("\n", $response);
// extract headers
$headers = array();
foreach($lines as $line) {
$kvsplit = explode(":", $line);
if (count($kvsplit) == 2) {
$header = $kvsplit[0];
$value = $kvsplit[1];
$headers[$header] = trim($value);
}
}
// extract status
$line_one_exploded = explode(" ", $lines[0]);
$status = $line_one_exploded[1];
// extract body
$body = $lines[count($lines) - 1];
// if the connection has been closed lets kill the socket
if (isset($headers["Connection"]) and $headers['Connection'] == "close") {
$this->_destroySocket();
if ($this->_debug()) {
$this->_log("Server told us connection closed so lets destroy the socket so it'll reconnect on next call");
}
}
$ret = array(
"status" => $status,
"body" => $body,
);
return $ret;
}
}

View File

@ -0,0 +1,313 @@
<?php
require_once(dirname(__FILE__) . "/Base/MixpanelBase.php");
require_once(dirname(__FILE__) . "/Producers/MixpanelPeople.php");
require_once(dirname(__FILE__) . "/Producers/MixpanelEvents.php");
require_once(dirname(__FILE__) . "/Producers/MixpanelGroups.php");
/**
* This is the main class for the Mixpanel PHP Library which provides all of the methods you need to track events,
* create/update profiles and group profiles.
*
* Architecture
* -------------
*
* This library is built such that all messages are buffered in an in-memory "queue"
* The queue will be automatically flushed at the end of every request. Alternatively, you can call "flush()" manually
* at any time. Flushed messages will be passed to a Consumer's "persist" method. The library comes with a handful of
* Consumers. The "CurlConsumer" is used by default which will send the messages to Mixpanel using forked cURL processes.
* You can implement your own custom Consumer to customize how a message is sent to Mixpanel. This can be useful when
* you want to put messages onto a distributed queue (such as ActiveMQ or Kestrel) instead of writing to Mixpanel in
* the user thread.
*
* Options
* -------------
*
* <table width="100%" cellpadding="5">
* <tr>
* <th>Option</th>
* <th>Description</th>
* <th>Default</th>
* </tr>
* <tr>
* <td>max_queue_size</td>
* <td>The maximum number of items to buffer in memory before flushing</td>
* <td>1000</td>
* </tr>
* <tr>
* <td>debug</td>
* <td>Enable/disable debug mode</td>
* <td>false</td>
* </tr>
* <tr>
* <td>consumer</td>
* <td>The consumer to use for writing messages</td>
* <td>curl</td>
* </tr>
* <tr>
* <td>consumers</td>
* <td>An array of custom consumers in the format array(consumer_key => class_name)</td>
* <td>null</td>
* </tr>
* <tr>
* <td>host</td>
* <td>The host name for api calls (used by some consumers)</td>
* <td>api.mixpanel.com</td>
* </tr>
* <tr>
* <td>events_endpoint</td>
* <td>The endpoint for tracking events (relative to the host)</td>
* <td>/events</td>
* </tr>
* <tr>
* <td>people_endpoint</td>
* <td>The endpoint for making people updates (relative to the host)</td>
* <td>/engage</td>
* </tr>
* <tr>
* <td>use_ssl</td>
* <td>Tell the consumer whether or not to use ssl (when available)</td>
* <td>true</td>
* </tr>
* <tr>
* <td>error_callback</td>
* <td>The name of a function to be called on consumption failures</td>
* <td>null</td>
* </tr>
* <tr>
* <td>connect_timeout</td>
* <td>In both the SocketConsumer and CurlConsumer, this is used for the connection timeout (i.e. How long it has take to actually make a connection).
* <td>5</td>
* </tr>
* <tr>
* <td>timeout</td>
* <td>In the CurlConsumer (non-forked), it is used to determine how long the cURL call has to execute.
* <td>30</td>
* </tr>
* </table>
*
* Example: Tracking an Event
* -------------
*
* $mp = Mixpanel::getInstance("MY_TOKEN");
*
* $mp->track("My Event");
*
* Example: Setting Profile Properties
* -------------
*
* $mp = Mixpanel::getInstance("MY_TOKEN", array("use_ssl" => false));
*
* $mp->people->set(12345, array(
* '$first_name' => "John",
* '$last_name' => "Doe",
* '$email' => "john.doe@example.com",
* '$phone' => "5555555555",
* 'Favorite Color' => "red"
* ));
*
*/
class Mixpanel extends Base_MixpanelBase {
/**
* An instance of the MixpanelPeople class (used to create/update profiles)
* @var Producers_MixpanelPeople
*/
public $people;
/**
* An instance of the MixpanelEvents class
* @var Producers_MixpanelEvents
*/
private $_events;
/**
* An instance of the MixpanelGroups class (used to create/update group profiles)
* @var Producers_MixpanelPeople
*/
public $group;
/**
* Instances' list of the Mixpanel class (for singleton use, splitted by token)
* @var Mixpanel[]
*/
private static $_instances = array();
/**
* Instantiates a new Mixpanel instance.
* @param $token
* @param array $options
*/
public function __construct($token, $options = array()) {
parent::__construct($options);
$this->people = new Producers_MixpanelPeople($token, $options);
$this->_events = new Producers_MixpanelEvents($token, $options);
$this->group = new Producers_MixpanelGroups($token, $options);
}
/**
* Returns a singleton instance of Mixpanel
* @param $token
* @param array $options
* @return Mixpanel
*/
public static function getInstance($token, $options = array()) {
if(!isset(self::$_instances[$token])) {
self::$_instances[$token] = new Mixpanel($token, $options);
}
return self::$_instances[$token];
}
/**
* Add an array representing a message to be sent to Mixpanel to the in-memory queue.
* @param array $message
*/
public function enqueue($message = array()) {
$this->_events->enqueue($message);
}
/**
* Add an array representing a list of messages to be sent to Mixpanel to a queue.
* @param array $messages
*/
public function enqueueAll($messages = array()) {
$this->_events->enqueueAll($messages);
}
/**
* Flush the events queue
* @param int $desired_batch_size
*/
public function flush($desired_batch_size = 50) {
$this->_events->flush($desired_batch_size);
}
/**
* Empty the events queue
*/
public function reset() {
$this->_events->reset();
}
/**
* Identify the user you want to associate to tracked events. The $anon_id must be UUID v4 format and not already merged to an $identified_id.
* All identify calls with a new and valid $anon_id will trigger a track $identify event, and merge to the $identified_id.
* @param string|int $user_id
* @param string|int $anon_id [optional]
*/
public function identify($user_id, $anon_id = null) {
$this->_events->identify($user_id, $anon_id);
}
/**
* Track an event defined by $event associated with metadata defined by $properties
* @param string $event
* @param array $properties
*/
public function track($event, $properties = array()) {
$this->_events->track($event, $properties);
}
/**
* Register a property to be sent with every event.
*
* If the property has already been registered, it will be
* overwritten. NOTE: Registered properties are only persisted for the life of the Mixpanel class instance.
* @param string $property
* @param mixed $value
*/
public function register($property, $value) {
$this->_events->register($property, $value);
}
/**
* Register multiple properties to be sent with every event.
*
* If any of the properties have already been registered,
* they will be overwritten. NOTE: Registered properties are only persisted for the life of the Mixpanel class
* instance.
* @param array $props_and_vals
*/
public function registerAll($props_and_vals = array()) {
$this->_events->registerAll($props_and_vals);
}
/**
* Register a property to be sent with every event.
*
* If the property has already been registered, it will NOT be
* overwritten. NOTE: Registered properties are only persisted for the life of the Mixpanel class instance.
* @param $property
* @param $value
*/
public function registerOnce($property, $value) {
$this->_events->registerOnce($property, $value);
}
/**
* Register multiple properties to be sent with every event.
*
* If any of the properties have already been registered,
* they will NOT be overwritten. NOTE: Registered properties are only persisted for the life of the Mixpanel class
* instance.
* @param array $props_and_vals
*/
public function registerAllOnce($props_and_vals = array()) {
$this->_events->registerAllOnce($props_and_vals);
}
/**
* Un-register an property to be sent with every event.
* @param string $property
*/
public function unregister($property) {
$this->_events->unregister($property);
}
/**
* Un-register a list of properties to be sent with every event.
* @param array $properties
*/
public function unregisterAll($properties) {
$this->_events->unregisterAll($properties);
}
/**
* Get a property that is set to be sent with every event
* @param string $property
* @return mixed
*/
public function getProperty($property)
{
return $this->_events->getProperty($property);
}
/**
* An alias to be merged with the distinct_id. Each alias can only map to one distinct_id.
* This is helpful when you want to associate a generated id (such as a session id) to a user id or username.
* @param string|int $distinct_id
* @param string|int $alias
*/
public function createAlias($distinct_id, $alias) {
$this->_events->createAlias($distinct_id, $alias);
}
}

View File

@ -0,0 +1,231 @@
<?php
require_once(dirname(__FILE__) . "/../Base/MixpanelBase.php");
require_once(dirname(__FILE__) . "/../ConsumerStrategies/FileConsumer.php");
require_once(dirname(__FILE__) . "/../ConsumerStrategies/CurlConsumer.php");
require_once(dirname(__FILE__) . "/../ConsumerStrategies/SocketConsumer.php");
if (!function_exists('json_encode')) {
throw new Exception('The JSON PHP extension is required.');
}
/**
* Provides some base methods for use by a message Producer
*/
abstract class Producers_MixpanelBaseProducer extends Base_MixpanelBase {
/**
* @var string a token associated to a Mixpanel project
*/
protected $_token;
/**
* @var array a queue to hold messages in memory before flushing in batches
*/
private $_queue = array();
/**
* @var ConsumerStrategies_AbstractConsumer the consumer to use when flushing messages
*/
private $_consumer = null;
/**
* @var array The list of available consumers
*/
private $_consumers = array(
"file" => "ConsumerStrategies_FileConsumer",
"curl" => "ConsumerStrategies_CurlConsumer",
"socket" => "ConsumerStrategies_SocketConsumer"
);
/**
* If the queue reaches this size we'll auto-flush to prevent out of memory errors
* @var int
*/
protected $_max_queue_size = 1000;
/**
* Creates a new MixpanelBaseProducer, assings Mixpanel project token, registers custom Consumers, and instantiates
* the desired consumer
* @param $token
* @param array $options
*/
public function __construct($token, $options = array()) {
parent::__construct($options);
// register any customer consumers
if (isset($options["consumers"])) {
$this->_consumers = array_merge($this->_consumers, $options['consumers']);
}
// set max queue size
if (isset($options["max_queue_size"])) {
$this->_max_queue_size = $options['max_queue_size'];
}
// associate token
$this->_token = $token;
if ($this->_debug()) {
$this->_log("Using token: ".$this->_token);
}
// instantiate the chosen consumer
$this->_consumer = $this->_getConsumer();
}
/**
* Flush the queue when we destruct the client with retries
*/
public function __destruct() {
$attempts = 0;
$max_attempts = 10;
$success = false;
while (!$success && $attempts < $max_attempts) {
if ($this->_debug()) {
$this->_log("destruct flush attempt #".($attempts+1));
}
$success = $this->flush();
$attempts++;
}
}
/**
* Iterate the queue and write in batches using the instantiated Consumer Strategy
* @param int $desired_batch_size
* @return bool whether or not the flush was successful
*/
public function flush($desired_batch_size = 50) {
$queue_size = count($this->_queue);
$succeeded = true;
$num_threads = $this->_consumer->getNumThreads();
if ($this->_debug()) {
$this->_log("Flush called - queue size: ".$queue_size);
}
while($queue_size > 0 && $succeeded) {
$batch_size = min(array($queue_size, $desired_batch_size*$num_threads, $this->_options['max_batch_size']*$num_threads));
$batch = array_splice($this->_queue, 0, $batch_size);
$succeeded = $this->_persist($batch);
if (!$succeeded) {
if ($this->_debug()) {
$this->_log("Batch consumption failed!");
}
$this->_queue = array_merge($batch, $this->_queue);
if ($this->_debug()) {
$this->_log("added batch back to queue, queue size is now $queue_size");
}
}
$queue_size = count($this->_queue);
if ($this->_debug()) {
$this->_log("Batch of $batch_size consumed, queue size is now $queue_size");
}
}
return $succeeded;
}
/**
* Empties the queue without persisting any of the messages
*/
public function reset() {
$this->_queue = array();
}
/**
* Returns the in-memory queue
* @return array
*/
public function getQueue() {
return $this->_queue;
}
/**
* Returns the current Mixpanel project token
* @return string
*/
public function getToken() {
return $this->_token;
}
/**
* Given a strategy type, return a new PersistenceStrategy object
* @return ConsumerStrategies_AbstractConsumer
*/
protected function _getConsumer() {
$key = $this->_options['consumer'];
$Strategy = $this->_consumers[$key];
if ($this->_debug()) {
$this->_log("Using consumer: " . $key . " -> " . $Strategy);
}
$this->_options['endpoint'] = $this->_getEndpoint();
return new $Strategy($this->_options);
}
/**
* Add an array representing a message to be sent to Mixpanel to a queue.
* @param array $message
*/
public function enqueue($message = array()) {
array_push($this->_queue, $message);
// force a flush if we've reached our threshold
if (count($this->_queue) > $this->_max_queue_size) {
$this->flush();
}
if ($this->_debug()) {
$this->_log("Queued message: ".json_encode($message));
}
}
/**
* Add an array representing a list of messages to be sent to Mixpanel to a queue.
* @param array $messages
*/
public function enqueueAll($messages = array()) {
foreach($messages as $message) {
$this->enqueue($message);
}
}
/**
* Given an array of messages, persist it with the instantiated Persistence Strategy
* @param $message
* @return mixed
*/
protected function _persist($message) {
return $this->_consumer->persist($message);
}
/**
* Return the endpoint that should be used by a consumer that consumes messages produced by this producer.
* @return string
*/
abstract function _getEndpoint();
}

View File

@ -0,0 +1,192 @@
<?php
require_once(dirname(__FILE__) . "/MixpanelBaseProducer.php");
require_once(dirname(__FILE__) . "/MixpanelPeople.php");
require_once(dirname(__FILE__) . "/../ConsumerStrategies/CurlConsumer.php");
/**
* Provides an API to track events on Mixpanel
*/
class Producers_MixpanelEvents extends Producers_MixpanelBaseProducer {
/**
* An array of properties to attach to every tracked event
* @var array
*/
private $_super_properties = array("mp_lib" => "php");
/**
* Track an event defined by $event associated with metadata defined by $properties
* @param string $event
* @param array $properties
*/
public function track($event, $properties = array()) {
// if no token is passed in, use current token
if (!isset($properties["token"])) $properties['token'] = $this->_token;
// if no time is passed in, use the current time
if (!isset($properties["time"])) $properties['time'] = microtime(true);
$params['event'] = $event;
$params['properties'] = array_merge($this->_super_properties, $properties);
$this->enqueue($params);
}
/**
* Register a property to be sent with every event. If the property has already been registered, it will be
* overwritten.
* @param string $property
* @param mixed $value
*/
public function register($property, $value) {
$this->_super_properties[$property] = $value;
}
/**
* Register multiple properties to be sent with every event. If any of the properties have already been registered,
* they will be overwritten.
* @param array $props_and_vals
*/
public function registerAll($props_and_vals = array()) {
foreach($props_and_vals as $property => $value) {
$this->register($property, $value);
}
}
/**
* Register a property to be sent with every event. If the property has already been registered, it will NOT be
* overwritten.
* @param $property
* @param $value
*/
public function registerOnce($property, $value) {
if (!isset($this->_super_properties[$property])) {
$this->register($property, $value);
}
}
/**
* Register multiple properties to be sent with every event. If any of the properties have already been registered,
* they will NOT be overwritten.
* @param array $props_and_vals
*/
public function registerAllOnce($props_and_vals = array()) {
foreach($props_and_vals as $property => $value) {
if (!isset($this->_super_properties[$property])) {
$this->register($property, $value);
}
}
}
/**
* Un-register an property to be sent with every event.
* @param string $property
*/
public function unregister($property) {
unset($this->_super_properties[$property]);
}
/**
* Un-register a list of properties to be sent with every event.
* @param array $properties
*/
public function unregisterAll($properties) {
foreach($properties as $property) {
$this->unregister($property);
}
}
/**
* Get a property that is set to be sent with every event
* @param string $property
* @return mixed
*/
public function getProperty($property) {
return $this->_super_properties[$property];
}
/**
* Identify the user you want to associate to tracked events. The $anon_id must be UUID v4 format and not already merged to an $identified_id.
* All identify calls with a new and valid $anon_id will trigger a track $identify event, and merge to the $identified_id.
* @param string|int $user_id
* @param string|int $anon_id [optional]
*/
public function identify($user_id, $anon_id = null) {
$this->register("distinct_id", $user_id);
$UUIDv4 = '/^[a-zA-Z0-9]*-[a-zA-Z0-9]*-[a-zA-Z0-9]*-[a-zA-Z0-9]*-[a-zA-Z0-9]*$/i';
if (!empty($anon_id)) {
if (preg_match($UUIDv4, $anon_id) !== 1) {
/* not a valid uuid */
error_log("Running Identify method (identified_id: $user_id, anon_id: $anon_id) failed, anon_id not in UUID v4 format");
} else {
$this->track('$identify', array(
'$identified_id' => $user_id,
'$anon_id' => $anon_id
));
}
}
}
/**
* An alias to be merged with the distinct_id. Each alias can only map to one distinct_id.
* This is helpful when you want to associate a generated id (such as a session id) to a user id or username.
*
* Because aliasing can be extremely vulnerable to race conditions and ordering issues, we'll make a synchronous
* call directly to Mixpanel when this method is called. If it fails we'll throw an Exception as subsequent
* events are likely to be incorrectly tracked.
* @param string|int $distinct_id
* @param string|int $alias
* @return array $msg
* @throws Exception
*/
public function createAlias($distinct_id, $alias) {
$msg = array(
"event" => '$create_alias',
"properties" => array("distinct_id" => $distinct_id, "alias" => $alias, "token" => $this->_token)
);
// Save the current fork/async options
$old_fork = isset($this->_options['fork']) ? $this->_options['fork'] : false;
$old_async = isset($this->_options['async']) ? $this->_options['async'] : false;
// Override fork/async to make the new consumer synchronous
$this->_options['fork'] = false;
$this->_options['async'] = false;
// The name is ambiguous, but this creates a new consumer with current $this->_options
$consumer = $this->_getConsumer();
$success = $consumer->persist(array($msg));
// Restore the original fork/async settings
$this->_options['fork'] = $old_fork;
$this->_options['async'] = $old_async;
if (!$success) {
error_log("Creating Mixpanel Alias (distinct id: $distinct_id, alias: $alias) failed");
throw new Exception("Tried to create an alias but the call was not successful");
} else {
return $msg;
}
}
/**
* Returns the "events" endpoint
* @return string
*/
function _getEndpoint() {
return $this->_options['events_endpoint'];
}
}

View File

@ -0,0 +1,103 @@
<?php
require_once(dirname(__FILE__) . "/MixpanelBaseProducer.php");
/**
* Provides an API to create/update group profiles on Mixpanel
*/
class Producers_MixpanelGroups extends Producers_MixpanelBaseProducer {
/**
* Internal method to prepare a message given the message data
* @param $group_key
* @param $group_id
* @param $operation
* @param $value
* @param boolean $ignore_time If the $ignore_time property is true, Mixpanel will not automatically update the "Last Seen" property of the group Profile. Otherwise, Mixpanel will add a "Last Seen" property associated with the current time
* @return array
*/
private function _constructPayload($group_key, $group_id, $operation, $value, $ignore_time = false) {
$payload = array(
'$token' => $this->_token,
'$group_key' => $group_key,
'$group_id' => $group_id,
'$time' => microtime(true),
$operation => $value
);
if ($ignore_time === true) $payload['$ignore_time'] = true;
return $payload;
}
/**
* Set properties on a group profile. If the group profile does not exist, it creates it with these properties.
* If it does exist, it sets the properties to these values, overwriting existing values.
* @param string|int $group_key the group_key used for groups in Project Settings
* @param string|int $group_id the group id used for the group profile
* @param array $props associative array of properties to set on the group profile
* @param boolean $ignore_time If the $ignore_time property is true, Mixpanel will not automatically update the "Last Seen" property of the group profile. Otherwise, Mixpanel will add a "Last Seen" property associated with the current time
*/
public function set($group_key, $group_id, $props, $ignore_time = false) {
$payload = $this->_constructPayload($group_key, $group_id, '$set', $props, $ignore_time);
$this->enqueue($payload);
}
/**
* Set properties on a group profile. If the Group profile does not exist, it creates it with these properties.
* If it does exist, it sets the properties to these values but WILL NOT overwrite existing values.
* @param string|int $group_key the group_key used for groups in Project Settings
* @param string|int $group_id the group id used for the group profile
* @param array $props associative array of properties to set on the group profile
* @param boolean $ignore_time If the $ignore_time property is true, Mixpanel will not automatically update the "Last Seen" property of the group profile. Otherwise, Mixpanel will add a "Last Seen" property associated with the current time
*/
public function setOnce($group_key, $group_id, $props, $ignore_time = false) {
$payload = $this->_constructPayload($group_key, $group_id, '$set_once', $props, $ignore_time);
$this->enqueue($payload);
}
/**
* Unset properties on a group profile. If the group does not exist, it creates it with no properties.
* If it does exist, it unsets these properties. NOTE: In other libraries we use 'unset' which is
* a reserved word in PHP.
* @param string|int $group_key the group_key used for groups in Project Settings
* @param string|int $group_id the group id used for the group profile
* @param array $props associative array of properties to unset on the group profile
* @param boolean $ignore_time If the $ignore_time property is true, Mixpanel will not automatically update the "Last Seen" property of the group profile. Otherwise, Mixpanel will add a "Last Seen" property associated with the current time
*/
public function remove($group_key, $group_id, $props, $ignore_time = false) {
$payload = $this->_constructPayload($group_key, $group_id, '$remove', $props, $ignore_time);
$this->enqueue($payload);
}
/**
* Adds $val to a list located at $prop. If the property does not exist, it will be created. If $val is a string
* and the list is empty or does not exist, a new list with one value will be created.
* @param string|int $group_key the group_key used for groups in Project Settings
* @param string|int $group_id the group id used for the group profile
* @param string $prop the property that holds the list
* @param string|array $val items to add to the list
* @param boolean $ignore_time If the $ignore_time property is true, Mixpanel will not automatically update the "Last Seen" property of the group profile. Otherwise, Mixpanel will add a "Last Seen" property associated with the current time
*/
public function union($group_key, $group_id, $prop, $val, $ignore_time = false) {
$payload = $this->_constructPayload($group_key, $group_id, '$union', array("$prop" => $val), $ignore_time);
$this->enqueue($payload);
}
/**
* Delete this group profile from Mixpanel
* @param string|int $group_key the group_key used for groups in Project Settings
* @param string|int $group_id the group id used for the group profile
* @param boolean $ignore_time If the $ignore_time property is true, Mixpanel will not automatically update the "Last Seen" property of the profile. Otherwise, Mixpanel will add a "Last Seen" property associated with the current time
*/
public function deleteGroup($group_key, $group_id, $ignore_time = false) {
$payload = $this->_constructPayload($group_key, $group_id, '$delete', "", $ignore_time);
$this->enqueue($payload);
}
/**
* Returns the "groups" endpoint
* @return string
*/
function _getEndpoint() {
return $this->_options['groups_endpoint'];
}
}

View File

@ -0,0 +1,159 @@
<?php
require_once(dirname(__FILE__) . "/MixpanelBaseProducer.php");
/**
* Provides an API to create/update profiles on Mixpanel
*/
class Producers_MixpanelPeople extends Producers_MixpanelBaseProducer {
/**
* Internal method to prepare a message given the message data
* @param $distinct_id
* @param $operation
* @param $value
* @param null $ip
* @param boolean $ignore_time If the $ignore_time property is true, Mixpanel will not automatically update the "Last Seen" property of the profile. Otherwise, Mixpanel will add a "Last Seen" property associated with the current time
* @param boolean $ignore_alias If the $ignore_alias property is true, an alias look up will not be performed after ingestion. Otherwise, a lookup for the distinct ID will be performed, and replaced if a match is found
* @return array
*/
private function _constructPayload($distinct_id, $operation, $value, $ip = null, $ignore_time = false, $ignore_alias = false) {
$payload = array(
'$token' => $this->_token,
'$distinct_id' => $distinct_id,
'$time' => microtime(true),
$operation => $value
);
if ($ip !== null) $payload['$ip'] = $ip;
if ($ignore_time === true) $payload['$ignore_time'] = true;
if ($ignore_alias === true) $payload['$ignore_alias'] = true;
return $payload;
}
/**
* Set properties on a user record. If the profile does not exist, it creates it with these properties.
* If it does exist, it sets the properties to these values, overwriting existing values.
* @param string|int $distinct_id the distinct_id or alias of a user
* @param array $props associative array of properties to set on the profile
* @param string|null $ip the ip address of the client (used for geo-location)
* @param boolean $ignore_time If the $ignore_time property is true, Mixpanel will not automatically update the "Last Seen" property of the profile. Otherwise, Mixpanel will add a "Last Seen" property associated with the current time
* @param boolean $ignore_alias If the $ignore_alias property is true, an alias look up will not be performed after ingestion. Otherwise, a lookup for the distinct ID will be performed, and replaced if a match is found
*/
public function set($distinct_id, $props, $ip = null, $ignore_time = false, $ignore_alias = false) {
$payload = $this->_constructPayload($distinct_id, '$set', $props, $ip, $ignore_time, $ignore_alias);
$this->enqueue($payload);
}
/**
* Set properties on a user record. If the profile does not exist, it creates it with these properties.
* If it does exist, it sets the properties to these values but WILL NOT overwrite existing values.
* @param string|int $distinct_id the distinct_id or alias of a user
* @param array $props associative array of properties to set on the profile
* @param string|null $ip the ip address of the client (used for geo-location)
* @param boolean $ignore_time If the $ignore_time property is true, Mixpanel will not automatically update the "Last Seen" property of the profile. Otherwise, Mixpanel will add a "Last Seen" property associated with the current time
* @param boolean $ignore_alias If the $ignore_alias property is true, an alias look up will not be performed after ingestion. Otherwise, a lookup for the distinct ID will be performed, and replaced if a match is found
*/
public function setOnce($distinct_id, $props, $ip = null, $ignore_time = false, $ignore_alias = false) {
$payload = $this->_constructPayload($distinct_id, '$set_once', $props, $ip, $ignore_time, $ignore_alias);
$this->enqueue($payload);
}
/**
* Unset properties on a user record. If the profile does not exist, it creates it with no properties.
* If it does exist, it unsets these properties. NOTE: In other libraries we use 'unset' which is
* a reserved word in PHP.
* @param string|int $distinct_id the distinct_id or alias of a user
* @param array $props associative array of properties to unset on the profile
* @param string|null $ip the ip address of the client (used for geo-location)
* @param boolean $ignore_time If the $ignore_time property is true, Mixpanel will not automatically update the "Last Seen" property of the profile. Otherwise, Mixpanel will add a "Last Seen" property associated with the current time
* @param boolean $ignore_alias If the $ignore_alias property is true, an alias look up will not be performed after ingestion. Otherwise, a lookup for the distinct ID will be performed, and replaced if a match is found
*/
public function remove($distinct_id, $props, $ip = null, $ignore_time = false, $ignore_alias = false) {
$payload = $this->_constructPayload($distinct_id, '$unset', $props, $ip, $ignore_time, $ignore_alias);
$this->enqueue($payload);
}
/**
* Increments the value of a property on a user record. If the profile does not exist, it creates it and sets the
* property to the increment value.
* @param string|int $distinct_id the distinct_id or alias of a user
* @param $prop string the property to increment
* @param int $val the amount to increment the property by
* @param string|null $ip the ip address of the client (used for geo-location)
* @param boolean $ignore_time If the $ignore_time property is true, Mixpanel will not automatically update the "Last Seen" property of the profile. Otherwise, Mixpanel will add a "Last Seen" property associated with the current time
* @param boolean $ignore_alias If the $ignore_alias property is true, an alias look up will not be performed after ingestion. Otherwise, a lookup for the distinct ID will be performed, and replaced if a match is found
*/
public function increment($distinct_id, $prop, $val, $ip = null, $ignore_time = false, $ignore_alias = false) {
$payload = $this->_constructPayload($distinct_id, '$add', array("$prop" => $val), $ip, $ignore_time, $ignore_alias);
$this->enqueue($payload);
}
/**
* Adds $val to a list located at $prop. If the property does not exist, it will be created. If $val is a string
* and the list is empty or does not exist, a new list with one value will be created.
* @param string|int $distinct_id the distinct_id or alias of a user
* @param string $prop the property that holds the list
* @param string|array $val items to add to the list
* @param string|null $ip the ip address of the client (used for geo-location)
* @param boolean $ignore_time If the $ignore_time property is true, Mixpanel will not automatically update the "Last Seen" property of the profile. Otherwise, Mixpanel will add a "Last Seen" property associated with the current time
* @param boolean $ignore_alias If the $ignore_alias property is true, an alias look up will not be performed after ingestion. Otherwise, a lookup for the distinct ID will be performed, and replaced if a match is found
*/
public function append($distinct_id, $prop, $val, $ip = null, $ignore_time = false, $ignore_alias = false) {
$operation = gettype($val) == "array" ? '$union' : '$append';
$payload = $this->_constructPayload($distinct_id, $operation, array("$prop" => $val), $ip, $ignore_time, $ignore_alias);
$this->enqueue($payload);
}
/**
* Adds a transaction to the user's profile for revenue tracking
* @param string|int $distinct_id the distinct_id or alias of a user
* @param string $amount the transaction amount e.g. "20.50"
* @param null $timestamp the timestamp of when the transaction occurred (default to current timestamp)
* @param string|null $ip the ip address of the client (used for geo-location)
* @param boolean $ignore_time If the $ignore_time property is true, Mixpanel will not automatically update the "Last Seen" property of the profile. Otherwise, Mixpanel will add a "Last Seen" property associated with the current time
* @param boolean $ignore_alias If the $ignore_alias property is true, an alias look up will not be performed after ingestion. Otherwise, a lookup for the distinct ID will be performed, and replaced if a match is found
*/
public function trackCharge($distinct_id, $amount, $timestamp = null, $ip = null, $ignore_time = false, $ignore_alias = false) {
$timestamp = $timestamp == null ? time() : $timestamp;
$date_iso = date("c", $timestamp);
$transaction = array(
'$time' => $date_iso,
'$amount' => $amount
);
$val = array('$transactions' => $transaction);
$payload = $this->_constructPayload($distinct_id, '$append', $val, $ip, $ignore_time, $ignore_alias);
$this->enqueue($payload);
}
/**
* Clear all transactions stored on a user's profile
* @param string|int $distinct_id the distinct_id or alias of a user
* @param string|null $ip the ip address of the client (used for geo-location)
* @param boolean $ignore_time If the $ignore_time property is true, Mixpanel will not automatically update the "Last Seen" property of the profile. Otherwise, Mixpanel will add a "Last Seen" property associated with the current time
* @param boolean $ignore_alias If the $ignore_alias property is true, an alias look up will not be performed after ingestion. Otherwise, a lookup for the distinct ID will be performed, and replaced if a match is found
*/
public function clearCharges($distinct_id, $ip = null, $ignore_time = false, $ignore_alias = false) {
$payload = $this->_constructPayload($distinct_id, '$set', array('$transactions' => array()), $ip, $ignore_time, $ignore_alias);
$this->enqueue($payload);
}
/**
* Delete this profile from Mixpanel
* @param string|int $distinct_id the distinct_id or alias of a user
* @param string|null $ip the ip address of the client (used for geo-location)
* @param boolean $ignore_time If the $ignore_time property is true, Mixpanel will not automatically update the "Last Seen" property of the profile. Otherwise, Mixpanel will add a "Last Seen" property associated with the current time
* @param boolean $ignore_alias If the $ignore_alias property is true, an alias look up will not be performed after ingestion. Otherwise, a lookup for the distinct ID will be performed, and replaced if a match is found
*/
public function deleteUser($distinct_id, $ip = null, $ignore_time = false, $ignore_alias = false) {
$payload = $this->_constructPayload($distinct_id, '$delete', "", $ip, $ignore_time, $ignore_alias);
$this->enqueue($payload);
}
/**
* Returns the "engage" endpoint
* @return string
*/
function _getEndpoint() {
return $this->_options['people_endpoint'];
}
}