FeatureEnVi: Visual Analytics for Feature Engineering Using Stepwise Selection and Semi-Automatic Extraction Approaches https://doi.org/10.1109/TVCG.2022.3141040
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
FeatureEnVi/backend/venv/lib/python3.7/site-packages/pymongo/change_stream.py

334 lines
12 KiB

# Copyright 2017 MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you
# may not use this file except in compliance with the License. You
# may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing
# permissions and limitations under the License.
"""Watch changes on a collection, a database, or the entire cluster."""
import copy
from bson import _bson_to_dict
from bson.raw_bson import RawBSONDocument
from bson.son import SON
from pymongo import common
from pymongo.collation import validate_collation_or_none
from pymongo.command_cursor import CommandCursor
from pymongo.errors import (ConnectionFailure,
InvalidOperation,
OperationFailure,
PyMongoError)
# The change streams spec considers the following server errors from the
# getMore command non-resumable. All other getMore errors are resumable.
_NON_RESUMABLE_GETMORE_ERRORS = frozenset([
11601, # Interrupted
136, # CappedPositionLost
237, # CursorKilled
None, # No error code was returned.
])
class ChangeStream(object):
"""The internal abstract base class for change stream cursors.
Should not be called directly by application developers. Use
:meth:`pymongo.collection.Collection.watch`,
:meth:`pymongo.database.Database.watch`, or
:meth:`pymongo.mongo_client.MongoClient.watch` instead.
.. versionadded:: 3.6
.. mongodoc:: changeStreams
"""
def __init__(self, target, pipeline, full_document, resume_after,
max_await_time_ms, batch_size, collation,
start_at_operation_time, session):
if pipeline is None:
pipeline = []
elif not isinstance(pipeline, list):
raise TypeError("pipeline must be a list")
common.validate_string_or_none('full_document', full_document)
validate_collation_or_none(collation)
common.validate_non_negative_integer_or_none("batchSize", batch_size)
self._decode_custom = False
self._orig_codec_options = target.codec_options
if target.codec_options.type_registry._decoder_map:
self._decode_custom = True
# Keep the type registry so that we support encoding custom types
# in the pipeline.
self._target = target.with_options(
codec_options=target.codec_options.with_options(
document_class=RawBSONDocument))
else:
self._target = target
self._pipeline = copy.deepcopy(pipeline)
self._full_document = full_document
self._resume_token = copy.deepcopy(resume_after)
self._max_await_time_ms = max_await_time_ms
self._batch_size = batch_size
self._collation = collation
self._start_at_operation_time = start_at_operation_time
self._session = session
self._cursor = self._create_cursor()
@property
def _aggregation_target(self):
"""The argument to pass to the aggregate command."""
raise NotImplementedError
@property
def _database(self):
"""The database against which the aggregation commands for
this ChangeStream will be run. """
raise NotImplementedError
def _pipeline_options(self):
options = {}
if self._full_document is not None:
options['fullDocument'] = self._full_document
if self._resume_token is not None:
options['resumeAfter'] = self._resume_token
if self._start_at_operation_time is not None:
options['startAtOperationTime'] = self._start_at_operation_time
return options
def _full_pipeline(self):
"""Return the full aggregation pipeline for this ChangeStream."""
options = self._pipeline_options()
full_pipeline = [{'$changeStream': options}]
full_pipeline.extend(self._pipeline)
return full_pipeline
def _run_aggregation_cmd(self, session, explicit_session):
"""Run the full aggregation pipeline for this ChangeStream and return
the corresponding CommandCursor.
"""
read_preference = self._target._read_preference_for(session)
client = self._database.client
with client._socket_for_reads(read_preference) as (sock_info, slave_ok):
pipeline = self._full_pipeline()
cmd = SON([("aggregate", self._aggregation_target),
("pipeline", pipeline),
("cursor", {})])
result = sock_info.command(
self._database.name,
cmd,
slave_ok,
read_preference,
self._target.codec_options,
parse_write_concern_error=True,
read_concern=self._target.read_concern,
collation=self._collation,
session=session,
client=self._database.client)
cursor = result["cursor"]
if (self._start_at_operation_time is None and
self._resume_token is None and
cursor.get("_id") is None and
sock_info.max_wire_version >= 7):
self._start_at_operation_time = result["operationTime"]
ns = cursor["ns"]
_, collname = ns.split(".", 1)
aggregation_collection = self._database.get_collection(
collname, codec_options=self._target.codec_options,
read_preference=read_preference,
write_concern=self._target.write_concern,
read_concern=self._target.read_concern
)
return CommandCursor(
aggregation_collection, cursor, sock_info.address,
batch_size=self._batch_size or 0,
max_await_time_ms=self._max_await_time_ms,
session=session, explicit_session=explicit_session)
def _create_cursor(self):
with self._database.client._tmp_session(self._session, close=False) as s:
return self._run_aggregation_cmd(
session=s,
explicit_session=self._session is not None
)
def _resume(self):
"""Reestablish this change stream after a resumable error."""
try:
self._cursor.close()
except PyMongoError:
pass
self._cursor = self._create_cursor()
def close(self):
"""Close this ChangeStream."""
self._cursor.close()
def __iter__(self):
return self
def next(self):
"""Advance the cursor.
This method blocks until the next change document is returned or an
unrecoverable error is raised. This method is used when iterating over
all changes in the cursor. For example::
try:
with db.collection.watch(
[{'$match': {'operationType': 'insert'}}]) as stream:
for insert_change in stream:
print(insert_change)
except pymongo.errors.PyMongoError:
# The ChangeStream encountered an unrecoverable error or the
# resume attempt failed to recreate the cursor.
logging.error('...')
Raises :exc:`StopIteration` if this ChangeStream is closed.
"""
while self.alive:
doc = self.try_next()
if doc is not None:
return doc
raise StopIteration
__next__ = next
@property
def alive(self):
"""Does this cursor have the potential to return more data?
.. note:: Even if :attr:`alive` is ``True``, :meth:`next` can raise
:exc:`StopIteration` and :meth:`try_next` can return ``None``.
.. versionadded:: 3.8
"""
return self._cursor.alive
def try_next(self):
"""Advance the cursor without blocking indefinitely.
This method returns the next change document without waiting
indefinitely for the next change. For example::
with db.collection.watch() as stream:
while stream.alive:
change = stream.try_next()
if change is not None:
print(change)
elif stream.alive:
# We end up here when there are no recent changes.
# Sleep for a while to avoid flooding the server with
# getMore requests when no changes are available.
time.sleep(10)
If no change document is cached locally then this method runs a single
getMore command. If the getMore yields any documents, the next
document is returned, otherwise, if the getMore returns no documents
(because there have been no changes) then ``None`` is returned.
:Returns:
The next change document or ``None`` when no document is available
after running a single getMore or when the cursor is closed.
.. versionadded:: 3.8
"""
# Attempt to get the next change with at most one getMore and at most
# one resume attempt.
try:
change = self._cursor._try_next(True)
except ConnectionFailure:
self._resume()
change = self._cursor._try_next(False)
except OperationFailure as exc:
if exc.code in _NON_RESUMABLE_GETMORE_ERRORS:
raise
self._resume()
change = self._cursor._try_next(False)
# No changes are available.
if change is None:
return None
try:
resume_token = change['_id']
except KeyError:
self.close()
raise InvalidOperation(
"Cannot provide resume functionality when the resume "
"token is missing.")
self._resume_token = copy.copy(resume_token)
self._start_at_operation_time = None
if self._decode_custom:
return _bson_to_dict(change.raw, self._orig_codec_options)
return change
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
class CollectionChangeStream(ChangeStream):
"""A change stream that watches changes on a single collection.
Should not be called directly by application developers. Use
helper method :meth:`pymongo.collection.Collection.watch` instead.
.. versionadded:: 3.7
"""
@property
def _aggregation_target(self):
return self._target.name
@property
def _database(self):
return self._target.database
class DatabaseChangeStream(ChangeStream):
"""A change stream that watches changes on all collections in a database.
Should not be called directly by application developers. Use
helper method :meth:`pymongo.database.Database.watch` instead.
.. versionadded:: 3.7
"""
@property
def _aggregation_target(self):
return 1
@property
def _database(self):
return self._target
class ClusterChangeStream(DatabaseChangeStream):
"""A change stream that watches changes on all collections in the cluster.
Should not be called directly by application developers. Use
helper method :meth:`pymongo.mongo_client.MongoClient.watch` instead.
.. versionadded:: 3.7
"""
def _pipeline_options(self):
options = super(ClusterChangeStream, self)._pipeline_options()
options["allChangesForCluster"] = True
return options