'hourly', 'activitypub_cleanup_remote_actors' => 'daily', 'activitypub_reprocess_outbox' => 'hourly', 'activitypub_outbox_purge' => 'daily', 'activitypub_inbox_purge' => 'daily', 'activitypub_ap_post_purge' => 'daily', 'activitypub_tombstone_purge' => 'daily', 'activitypub_sync_blocklist_subscriptions' => 'weekly', ); /** * Allowed batch callbacks. * * @var array */ private static $batch_callbacks = array(); /** * Get the pause between async batches (in seconds). * * @return int The pause in seconds. */ public static function get_retry_delay() { /** * Filters the pause between async batches (in seconds). * * @param int $async_batch_pause The pause in seconds. Default 30. */ return apply_filters( 'activitypub_scheduler_async_batch_pause', 30 ); } /** * Initialize the class, registering WordPress hooks. */ public static function init() { self::register_schedulers(); // Custom cron schedules. \add_filter( 'cron_schedules', array( self::class, 'add_cron_schedules' ) ); // Follower Cleanups. \add_action( 'activitypub_update_remote_actors', array( self::class, 'update_remote_actors' ) ); \add_action( 'activitypub_cleanup_remote_actors', array( self::class, 'cleanup_remote_actors' ) ); // Event callbacks. \add_action( 'activitypub_async_batch', array( self::class, 'async_batch' ), 10, 99 ); \add_action( 'activitypub_reprocess_outbox', array( self::class, 'reprocess_outbox' ) ); \add_action( 'activitypub_outbox_purge', array( self::class, 'purge_outbox' ) ); \add_action( 'activitypub_inbox_purge', array( self::class, 'purge_inbox' ) ); \add_action( 'activitypub_ap_post_purge', array( self::class, 'purge_ap_posts' ) ); \add_action( 'activitypub_tombstone_purge', array( self::class, 'purge_tombstones' ) ); \add_action( 'activitypub_inbox_create_item', array( self::class, 'process_inbox_activity' ) ); \add_action( 'activitypub_sync_blocklist_subscriptions', array( Blocklist_Subscriptions::class, 'sync_all' ) ); \add_action( 'post_activitypub_add_to_outbox', array( self::class, 'schedule_outbox_activity_for_federation' ) ); \add_action( 'post_activitypub_add_to_outbox', array( self::class, 'schedule_announce_activity' ), 10, 4 ); \add_action( 'update_option_activitypub_outbox_purge_days', array( self::class, 'update_outbox_purge_schedule' ), 10, 2 ); \add_action( 'update_option_activitypub_inbox_purge_days', array( self::class, 'update_inbox_purge_schedule' ), 10, 2 ); \add_action( 'update_option_activitypub_ap_post_purge_days', array( self::class, 'update_ap_post_purge_schedule' ), 10, 2 ); } /** * Register handlers. */ public static function register_schedulers() { Post::init(); Actor::init(); Collection_Sync::init(); Comment::init(); Statistics::init(); /** * Register additional schedulers. * * @since 5.0.0 */ \do_action( 'activitypub_register_schedulers' ); } /** * Add custom cron schedules. * * @param array $schedules Existing cron schedules. * * @return array Modified cron schedules. */ public static function add_cron_schedules( $schedules ) { $schedules['monthly'] = array( 'interval' => MONTH_IN_SECONDS, 'display' => \__( 'Once Monthly', 'activitypub' ), ); $schedules['yearly'] = array( 'interval' => YEAR_IN_SECONDS, 'display' => \__( 'Once Yearly', 'activitypub' ), ); return $schedules; } /** * Register a batch callback for async processing. * * @param string $hook The cron event hook name. * @param callable $callback The callback to execute. */ public static function register_async_batch_callback( $hook, $callback ) { if ( \did_action( 'init' ) && ! \doing_action( 'init' ) ) { \_doing_it_wrong( __METHOD__, 'Async batch callbacks should be registered before or during the init action.', '7.5.0' ); return; } if ( ! \is_callable( $callback ) ) { return; } self::$batch_callbacks[ $hook ] = $callback; // Register the WordPress action hook to trigger async_batch. \add_action( $hook, array( self::class, 'async_batch' ), 10, 99 ); } /** * Schedule all ActivityPub schedules. */ public static function register_schedules() { foreach ( self::SCHEDULES as $hook => $recurrence ) { if ( ! \wp_next_scheduled( $hook ) ) { \wp_schedule_event( time(), $recurrence, $hook ); } } // Schedule monthly stats collection for the 1st of each month. if ( ! \wp_next_scheduled( 'activitypub_collect_monthly_stats' ) ) { // Calculate next 1st of month at 2:00 AM. $next_first = self::get_next_first_of_month(); \wp_schedule_event( $next_first, 'monthly', 'activitypub_collect_monthly_stats' ); } // Schedule annual stats compilation for December 1st (wrapped notification). if ( ! \wp_next_scheduled( 'activitypub_compile_annual_stats' ) ) { $next_december = self::get_next_december_first(); \wp_schedule_event( $next_december, 'yearly', 'activitypub_compile_annual_stats' ); } } /** * Un-schedule all ActivityPub schedules. * * @return void */ public static function deregister_schedules() { foreach ( array_keys( self::SCHEDULES ) as $hook ) { \wp_unschedule_hook( $hook ); } // Statistics schedules. \wp_unschedule_hook( 'activitypub_collect_monthly_stats' ); \wp_unschedule_hook( 'activitypub_compile_annual_stats' ); } /** * Get the next 1st of month timestamp. * * @return int Unix timestamp of next 1st of month at 2:00 AM. */ private static function get_next_first_of_month() { $now = \current_time( 'timestamp' ); // phpcs:ignore WordPress.DateTime.CurrentTimeTimestamp.Requested $next_month = \strtotime( 'first day of next month 02:00:00', $now ); return $next_month; } /** * Get the next December 1st timestamp for wrapped notification. * * @return int Unix timestamp of next December 1st at 3:00 AM. */ private static function get_next_december_first() { $now = \current_time( 'timestamp' ); // phpcs:ignore WordPress.DateTime.CurrentTimeTimestamp.Requested $year = (int) \gmdate( 'Y', $now ); // Get December 1st 3:00 AM for this year. $this_year_dec_first = \strtotime( sprintf( '%d-12-01 03:00:00', $year ) ); // If we're already past this year's December 1st, schedule for next year. if ( $now >= $this_year_dec_first ) { return \strtotime( sprintf( '%d-12-01 03:00:00', $year + 1 ) ); } return $this_year_dec_first; } /** * Unschedule events for an outbox item. * * @param int $outbox_item_id The outbox item ID. */ public static function unschedule_events_for_item( $outbox_item_id ) { $event_args = array( $outbox_item_id, Dispatcher::get_batch_size(), \get_post_meta( $outbox_item_id, '_activitypub_outbox_offset', true ) ?: 0, // phpcs:ignore ); \delete_post_meta( $outbox_item_id, '_activitypub_outbox_offset' ); $timestamp = \wp_next_scheduled( 'activitypub_process_outbox', array( $outbox_item_id ) ); \wp_unschedule_event( $timestamp, 'activitypub_process_outbox', array( $outbox_item_id ) ); $timestamp = \wp_next_scheduled( 'activitypub_send_activity', $event_args ); \wp_unschedule_event( $timestamp, 'activitypub_send_activity', $event_args ); // Invalidate any retries for this outbox item. foreach ( _get_cron_array() as $timestamp => $cron ) { if ( ! isset( $cron['activitypub_retry_activity'] ) ) { continue; } foreach ( $cron['activitypub_retry_activity'] as $event ) { if ( isset( $event['args'][1] ) && $outbox_item_id === $event['args'][1] ) { \wp_unschedule_event( $timestamp, 'activitypub_retry_activity', $event['args'] ); } } } } /** * Update remote Actors. */ public static function update_remote_actors() { $number = 5; if ( defined( 'DISABLE_WP_CRON' ) && DISABLE_WP_CRON ) { $number = 50; } /** * Filter the number of remote Actors to update. * * @param int $number The number of remote Actors to update. */ $number = apply_filters( 'activitypub_update_remote_actors_number', $number ); $actors = Remote_Actors::get_outdated( $number ); foreach ( $actors as $actor ) { $meta = get_remote_metadata_by_actor( $actor->guid, false ); if ( empty( $meta ) || ! is_array( $meta ) || is_wp_error( $meta ) ) { Remote_Actors::add_error( $actor->ID, 'Failed to fetch or parse metadata' ); } else { $id = Remote_Actors::upsert( $meta ); if ( \is_wp_error( $id ) ) { continue; } Remote_Actors::clear_errors( $id ); } } } /** * Cleanup remote Actors. */ public static function cleanup_remote_actors() { $number = 5; if ( defined( 'DISABLE_WP_CRON' ) && DISABLE_WP_CRON ) { $number = 50; } /** * Filter the number of remote Actors to clean up. * * @param int $number The number of remote Actors to clean up. */ $number = apply_filters( 'activitypub_cleanup_remote_actors_number', $number ); $actors = Remote_Actors::get_faulty( $number ); foreach ( $actors as $actor ) { $meta = get_remote_metadata_by_actor( $actor->guid, false ); if ( Tombstone::exists( $meta ) ) { \wp_delete_post( $actor->ID ); } elseif ( empty( $meta ) || ! is_array( $meta ) || \is_wp_error( $meta ) ) { if ( Remote_Actors::count_errors( $actor->ID ) >= 5 ) { \wp_schedule_single_event( \time(), 'activitypub_delete_remote_actor_interactions', array( $actor->guid ) ); \wp_schedule_single_event( \time(), 'activitypub_delete_remote_actor_posts', array( $actor->guid ) ); \wp_delete_post( $actor->ID ); } else { Remote_Actors::add_error( $actor->ID, $meta ); } } else { $id = Remote_Actors::upsert( $meta ); if ( \is_wp_error( $id ) ) { Remote_Actors::add_error( $actor->ID, $id ); } else { Remote_Actors::clear_errors( $actor->ID ); } } } } /** * Schedule the outbox item for federation. * * @param int $id The ID of the outbox item. * @param int $offset The offset to add to the scheduled time. Default 3 seconds. */ public static function schedule_outbox_activity_for_federation( $id, $offset = 3 ) { $hook = 'activitypub_process_outbox'; $args = array( $id ); if ( false === wp_next_scheduled( $hook, $args ) ) { \wp_schedule_single_event( \time() + $offset, $hook, $args ); } } /** * Reprocess the outbox. */ public static function reprocess_outbox() { $ids = \get_posts( array( 'post_type' => Outbox::POST_TYPE, 'post_status' => 'pending', 'posts_per_page' => 10, 'fields' => 'ids', ) ); foreach ( $ids as $id ) { // Bail if there is a pending batch. $offset = \get_post_meta( $id, '_activitypub_outbox_offset', true ) ?: 0; // phpcs:ignore if ( \wp_next_scheduled( 'activitypub_send_activity', array( $id, Dispatcher::get_batch_size(), $offset ) ) ) { return; } // Bail if there is a batch in progress. $key = \md5( \serialize( $id ) ); // phpcs:ignore WordPress.PHP.DiscouragedPHPFunctions.serialize_serialize if ( self::is_locked( $key ) ) { return; } self::schedule_outbox_activity_for_federation( $id ); } } /** * Purge outbox items based on a schedule. */ public static function purge_outbox() { Outbox::purge( \get_option( 'activitypub_outbox_purge_days', ACTIVITYPUB_OUTBOX_PURGE_DAYS ) ); } /** * Purge inbox items based on a schedule. */ public static function purge_inbox() { Inbox::purge( \get_option( 'activitypub_inbox_purge_days', ACTIVITYPUB_INBOX_PURGE_DAYS ) ); } /** * Purge remote posts based on a schedule. */ public static function purge_ap_posts() { Remote_Posts::purge( \get_option( 'activitypub_ap_post_purge_days', ACTIVITYPUB_AP_POST_PURGE_DAYS ) ); } /** * Daily cron handler that purges expired tombstones. * * Retention is non-urgent: large backlogs (e.g. after retention is first enforced) * drain across multiple daily runs. * * @since 8.3.0 */ public static function purge_tombstones() { \Activitypub\Tombstone::purge(); } /** * Process cached inbox activity. * * Retrieves all collected user IDs for an activity and processes them together. * * @param string $activity_id The activity ID. */ public static function process_inbox_activity( $activity_id ) { // Deduplicate if multiple inbox items were created due to race condition. $inbox_item = Inbox::deduplicate( $activity_id ); if ( ! $inbox_item ) { return; } $data = \json_decode( $inbox_item->post_content, true ); // Reconstruct activity from inbox post. $activity = Activity::init_from_array( $data ); $type = \Activitypub\camel_to_snake_case( $activity->get_type() ); $context = Inbox::CONTEXT_INBOX; $user_ids = Inbox::get_recipients( $inbox_item->ID ); /** * Fires after any ActivityPub Inbox activity has been handled, regardless of activity type. * * This hook is triggered for all activity types processed by the inbox handler. * * @param array $data The data array. * @param array $user_ids The user IDs. * @param string $type The type of the activity. * @param Activity $activity The Activity object. * @param int $result The ID of the inbox item that was created, or WP_Error if failed. * @param string $context The context of the request ('inbox' or 'shared_inbox'). */ \do_action( 'activitypub_handled_inbox', $data, $user_ids, $type, $activity, $inbox_item->ID, $context ); /** * Fires after an ActivityPub Inbox activity has been handled. * * @param array $data The data array. * @param array $user_ids The user IDs. * @param Activity $activity The Activity object. * @param int $result The ID of the inbox item that was created, or WP_Error if failed. * @param string $context The context of the request ('inbox' or 'shared_inbox'). */ \do_action( 'activitypub_handled_inbox_' . $type, $data, $user_ids, $activity, $inbox_item->ID, $context ); } /** * Update schedules when outbox purge days settings change. * * @param int $old_value The old value. * @param int $value The new value. */ public static function update_outbox_purge_schedule( $old_value, $value ) { if ( 0 === (int) $value ) { \wp_clear_scheduled_hook( 'activitypub_outbox_purge' ); } elseif ( ! \wp_next_scheduled( 'activitypub_outbox_purge' ) ) { \wp_schedule_event( \time(), 'daily', 'activitypub_outbox_purge' ); } } /** * Update schedules when inbox purge days settings change. * * @param int $old_value The old value. * @param int $value The new value. */ public static function update_inbox_purge_schedule( $old_value, $value ) { if ( 0 === (int) $value ) { \wp_clear_scheduled_hook( 'activitypub_inbox_purge' ); } elseif ( ! \wp_next_scheduled( 'activitypub_inbox_purge' ) ) { \wp_schedule_event( \time(), 'daily', 'activitypub_inbox_purge' ); } } /** * Update schedules when remote posts purge days settings change. * * @param int $old_value The old value. * @param int $value The new value. */ public static function update_ap_post_purge_schedule( $old_value, $value ) { if ( 0 === (int) $value ) { \wp_clear_scheduled_hook( 'activitypub_ap_post_purge' ); } elseif ( ! \wp_next_scheduled( 'activitypub_ap_post_purge' ) ) { \wp_schedule_event( \time(), 'daily', 'activitypub_ap_post_purge' ); } } /** * Asynchronously runs batch processing routines. * * The batching part is optional and only comes into play if the callback returns anything. * Beyond that it's a helper to run a callback asynchronously with locking to prevent simultaneous processing. * * @params mixed ...$args Optional. Parameters that get passed to the callback. */ public static function async_batch() { $args = \func_get_args(); // phpcs:ignore PHPCompatibility.FunctionUse.ArgumentFunctionsReportCurrentValue $callback = self::$batch_callbacks[ \current_action() ] ?? $args[0] ?? null; if ( ! \is_callable( $callback ) ) { \_doing_it_wrong( __METHOD__, 'There must be a valid callback associated with the current action.', '5.2.0' ); return; } $key = \md5( \serialize( $callback ) ); // phpcs:ignore WordPress.PHP.DiscouragedPHPFunctions.serialize_serialize // Bail if the existing lock is still valid. if ( self::is_locked( $key ) ) { \wp_schedule_single_event( \time() + MINUTE_IN_SECONDS, \current_action(), $args ); return; } self::lock( $key ); if ( \is_callable( $args[0] ?? null ) ) { $callback = \array_shift( $args ); // Remove $callback from arguments. } $next = \call_user_func_array( $callback, $args ); self::unlock( $key ); if ( ! empty( $next ) ) { // Schedule the next run, adding the result to the arguments. \wp_schedule_single_event( \time() + self::get_retry_delay(), \current_action(), \array_values( $next ) ); } } /** * Locks the async batch process for individual callbacks to prevent simultaneous processing. * * @param string $key Serialized callback name. * @return bool|int True if the lock was successful, timestamp of existing lock otherwise. */ public static function lock( $key ) { global $wpdb; // Try to lock. $lock_result = (bool) $wpdb->query( $wpdb->prepare( "INSERT IGNORE INTO `$wpdb->options` ( `option_name`, `option_value`, `autoload` ) VALUES (%s, %s, 'no') /* LOCK */", 'activitypub_async_batch_' . $key, \time() ) ); // phpcs:ignore WordPress.DB if ( ! $lock_result ) { $lock_result = \get_option( 'activitypub_async_batch_' . $key ); } return $lock_result; } /** * Unlocks processing for the async batch callback. * * @param string $key Serialized callback name. */ public static function unlock( $key ) { \delete_option( 'activitypub_async_batch_' . $key ); } /** * Whether the async batch callback is locked. * * @param string $key Serialized callback name. * @return boolean */ public static function is_locked( $key ) { $lock = \get_option( 'activitypub_async_batch_' . $key ); if ( ! $lock ) { return false; } $lock = (int) $lock; if ( $lock < \time() - 1800 ) { self::unlock( $key ); return false; } return true; } /** * Send announces. * * @param int $outbox_activity_id The outbox activity ID. * @param Activity $activity The activity object. * @param int $actor_id The actor ID. * @param int $content_visibility The content visibility. */ public static function schedule_announce_activity( $outbox_activity_id, $activity, $actor_id, $content_visibility ) { // Only if we're in both Blog and User modes. if ( ACTIVITYPUB_ACTOR_AND_BLOG_MODE !== \get_option( 'activitypub_actor_mode', ACTIVITYPUB_ACTOR_MODE ) ) { return; } // Only if this isn't the Blog Actor. if ( Actors::BLOG_USER_ID === $actor_id ) { return; } // Only if the content is public or quiet public. if ( ACTIVITYPUB_CONTENT_VISIBILITY_PUBLIC !== $content_visibility ) { return; } // Only if the activity is a Create. if ( 'Create' !== $activity->get_type() ) { return; } if ( ! is_object( $activity->get_object() ) ) { return; } // Check if the object is an article, image, audio, video, event, or document and ignore profile updates and other activities. if ( ! in_array( $activity->get_object()->get_type(), Base_Object::TYPES, true ) ) { return; } $announce = new Activity(); $announce->set_type( 'Announce' ); $announce->set_actor( Actors::get_by_id( Actors::BLOG_USER_ID )->get_id() ); $announce->set_object( $activity ); $announce->add_cc( object_to_uri( $activity->get_actor() ) ); $outbox_activity_id = Outbox::add( $announce, Actors::BLOG_USER_ID ); if ( ! $outbox_activity_id ) { return; } // Schedule the outbox item for federation. self::schedule_outbox_activity_for_federation( $outbox_activity_id, 120 ); } }