Skip to content

Atlas Topics

Atlas pre-organizes your data into topics informed by the latent contents of your embeddings. Visually, these are represented by regions of homogenous color on an Atlas map.

You can access and operate on topics programmatically by using the topics attribute of an AtlasMap.

from nomic import AtlasProject

map = AtlasProject(name='My Project').maps[0]

map.topics

AtlasMapTopics

Atlas Topics State

from nomic import AtlasProject

project = AtlasProject(name='My Project')
map = project.maps[0]
print(map.topics)
                id_      topic_depth_1       topic_depth_2          topic_depth_3
0     000262a5-2811  Space exploration      Hurricane Jeanne        Spacecraft Cassini
1     000c453d-ee97   English football      Athens 2004 Olympics    bobby rathore
...
9999  fffcc65c-38dc  Space exploration      Presidential elections  Blood
Source code in nomic/data_operations.py
class AtlasMapTopics:
    """
    Atlas Topics State

    === "Accessing Topics Example"
        ``` py
        from nomic import AtlasProject

        project = AtlasProject(name='My Project')
        map = project.maps[0]
        print(map.topics)
        ```
    === "Output"
        ```
                        id_      topic_depth_1       topic_depth_2          topic_depth_3
        0     000262a5-2811  Space exploration      Hurricane Jeanne        Spacecraft Cassini
        1     000c453d-ee97   English football      Athens 2004 Olympics    bobby rathore
        ...
        9999  fffcc65c-38dc  Space exploration      Presidential elections  Blood
        ```
    """

    def __init__(self, projection: "AtlasProjection"):
        self.projection = projection
        self.project = projection.project
        self.id_field = self.projection.project.id_field
        try:
            self._tb: pa.Table = projection._fetch_tiles().select(
                [self.id_field, '_topic_depth_1', '_topic_depth_2', '_topic_depth_3']
            ).rename_columns([self.id_field, 'topic_depth_1', 'topic_depth_2', 'topic_depth_3'])
        except pa.lib.ArrowInvalid as e:
            raise ValueError("Topic modeling has not yet been run on this map.")
        self._metadata = None
        self._hierarchy = None

    @property
    def df(self) -> pandas.DataFrame:
        """
        A pandas dataframe associating each datapoint on your map to their topics as each topic depth.
        """
        return self.tb.to_pandas()

    @property
    def tb(self) -> pa.Table:
        """
        Pyarrow table associating each datapoint on the map to their Atlas assigned topics.
        This table is memmapped from the underlying files and is the most efficient way to
        access topic information.
        """
        return self._tb

    @property
    def metadata(self) -> pandas.DataFrame:
        """
        Pandas dataframe where each row gives metadata all map topics including:

        - topic id
        - a human readable topic description
        - identifying keywords that differentiate the topic from other topics
        """
        if self._metadata is not None:
            return self._metadata

        response = requests.get(
            self.projection.project.atlas_api_path
            + "/v1/project/{}/index/projection/{}".format(
                self.projection.project.meta['id'], self.projection.projection_id
            ),
            headers=self.projection.project.header,
        )
        topics = json.loads(response.text)['topic_models'][0]['features']
        topic_data = [e['properties'] for e in topics]
        topic_data = pd.DataFrame(topic_data)
        topic_data = topic_data.rename(columns={"topic": "topic_id",
                                                '_topic_depth_1': 'topic_depth_1',
                                                '_topic_depth_2': 'topic_depth_2',
                                                '_topic_depth_3': 'topic_depth_3'})
        self._metadata = topic_data

        return topic_data

    @property
    def hierarchy(self) -> Dict:
        """
        A dictionary that allows iteration of the topic hierarchy. Each key is a topic mapping to its sub-topics.
        If topic is not a key in the hierarchy, it is leaf in the topic hierarchy.
        """
        if self._hierarchy is not None:
            return self._hierarchy

        topic_df = self.metadata

        topic_hierarchy = defaultdict(list)
        cols = ["topic_id", "topic_depth_1", "topic_depth_2", "topic_depth_3"]

        for i, row in topic_df[cols].iterrows():
            # Only consider the non-null values for each row
            topics = [topic for topic in row if pd.notna(topic)]

            # Iterate over the topics in each row, adding each topic to the
            # list of subtopics for the topic at the previous depth
            for i in range(1, len(topics) - 1):
                if topics[i + 1] not in topic_hierarchy[topics[i]]:
                    topic_hierarchy[topics[i]].append(topics[i + 1])
        self._hierarchy = dict(topic_hierarchy)

        return self._hierarchy

    def group_by_topic(self, topic_depth: int = 1) -> List[Dict]:
        """
        Associates topics at a given depth in the topic hierarchy to the identifiers of their contained datapoints.

        Args:
            topic_depth: Topic depth to group datums by. Acceptable values
                currently are (1, 2, 3).
        Returns:
            List of dictionaries where each dictionary contains next depth
                subtopics, subtopic ids, topic_id, topic_short_description,
                topic_long_description, and list of datum_ids.
        """

        topic_cols = []
        # TODO: This will need to be changed once topic depths becomes dynamic and not hard-coded
        if topic_depth not in (1, 2, 3):
            raise ValueError("Topic depth out of range.")

        # Unique datum id column to aggregate
        datum_id_col = self.project.meta["unique_id_field"]

        df = self.df

        topic_datum_dict = df.groupby(f"topic_depth_{topic_depth}")[datum_id_col].apply(set).to_dict()

        topic_df = self.metadata
        hierarchy = self.hierarchy

        result = []
        for topic, datum_ids in topic_datum_dict.items():
            # Encountered topic with zero datums
            if len(datum_ids) == 0:
                continue

            result_dict = {}
            topic_metadata = topic_df[topic_df["topic_short_description"] == topic]
            subtopics = []
            if topic in hierarchy:
                subtopics = hierarchy[topic]
            result_dict["subtopics"] = subtopics
            result_dict["subtopic_ids"] = topic_df[topic_df["topic_short_description"].isin(subtopics)][
                "topic_id"
            ].tolist()
            result_dict["topic_id"] = topic_metadata["topic_id"].item()
            result_dict["topic_short_description"] = topic_metadata["topic_short_description"].item()
            result_dict["topic_long_description"] = topic_metadata["topic_description"].item()
            result_dict["datum_ids"] = datum_ids
            result.append(result_dict)
        return result

    def get_topic_density(self, time_field: str, start: datetime, end: datetime):
        '''
        Computes the density/frequency of topics in a given interval of a timestamp field.

        Useful for answering questions such as:

        - What topics increased in prevalence between December and January?

        Args:
            time_field: Your metadata field containing isoformat timestamps
            start: A datetime object for the window start
            end: A datetime object for the window end

        Returns:
            List[{topic: str, count: int}] - A list of {topic, count} dictionaries, sorted from largest count to smallest count
        '''
        response = requests.post(
            self.project.atlas_api_path + "/v1/project/{}/topic_density".format(self.projection.atlas_index_id),
            headers=self.project.header,
            json={'start': start.isoformat(), 'end': end.isoformat(), 'time_field': time_field},
        )
        if response.status_code != 200:
            raise Exception(response.text)

        return response.json()

    def vector_search_topics(self, queries: np.array, k: int = 32, depth: int = 3) -> Dict:
        '''
        Given an embedding, returns a normalized distribution over topics.

        Useful for answering the questions such as:

        - What topic does my new datapoint belong to?
        - Does by datapoint belong to the "Dog" topic or the "Cat" topic.

        Args:
            queries: a 2d numpy array where each row corresponds to a query vector
            k: (Default 32) the number of neighbors to use when estimating the posterior
            depth: (Default 3) the topic depth at which you want to search

        Returns:
            A dict mapping {topic: posterior probability} for each query
        '''

        if queries.ndim != 2:
            raise ValueError(
                'Expected a 2 dimensional array. If you have a single query, we expect an array of shape (1, d).'
            )

        bytesio = io.BytesIO()
        np.save(bytesio, queries)

        response = requests.post(
            self.project.atlas_api_path + "/v1/project/data/get/embedding/topic",
            headers=self.project.header,
            json={
                'atlas_index_id': self.projection.atlas_index_id,
                'queries': base64.b64encode(bytesio.getvalue()).decode('utf-8'),
                'k': k,
                'depth': depth,
            },
        )

        if response.status_code != 200:
            raise Exception(response.text)

        return response.json()

    def __repr__(self) -> str:
        return str(self.df)
df: pandas.DataFrame property

A pandas dataframe associating each datapoint on your map to their topics as each topic depth.

hierarchy: Dict property

A dictionary that allows iteration of the topic hierarchy. Each key is a topic mapping to its sub-topics. If topic is not a key in the hierarchy, it is leaf in the topic hierarchy.

metadata: pandas.DataFrame property

Pandas dataframe where each row gives metadata all map topics including:

  • topic id
  • a human readable topic description
  • identifying keywords that differentiate the topic from other topics
tb: pa.Table property

Pyarrow table associating each datapoint on the map to their Atlas assigned topics. This table is memmapped from the underlying files and is the most efficient way to access topic information.

get_topic_density(time_field, start, end)

Computes the density/frequency of topics in a given interval of a timestamp field.

Useful for answering questions such as:

  • What topics increased in prevalence between December and January?

Parameters:

  • time_field (str) –

    Your metadata field containing isoformat timestamps

  • start (datetime) –

    A datetime object for the window start

  • end (datetime) –

    A datetime object for the window end

Returns:

  • List[{topic: str, count: int}] - A list of {topic, count} dictionaries, sorted from largest count to smallest count

Source code in nomic/data_operations.py
def get_topic_density(self, time_field: str, start: datetime, end: datetime):
    '''
    Computes the density/frequency of topics in a given interval of a timestamp field.

    Useful for answering questions such as:

    - What topics increased in prevalence between December and January?

    Args:
        time_field: Your metadata field containing isoformat timestamps
        start: A datetime object for the window start
        end: A datetime object for the window end

    Returns:
        List[{topic: str, count: int}] - A list of {topic, count} dictionaries, sorted from largest count to smallest count
    '''
    response = requests.post(
        self.project.atlas_api_path + "/v1/project/{}/topic_density".format(self.projection.atlas_index_id),
        headers=self.project.header,
        json={'start': start.isoformat(), 'end': end.isoformat(), 'time_field': time_field},
    )
    if response.status_code != 200:
        raise Exception(response.text)

    return response.json()
group_by_topic(topic_depth=1)

Associates topics at a given depth in the topic hierarchy to the identifiers of their contained datapoints.

Parameters:

  • topic_depth (int, default: 1 ) –

    Topic depth to group datums by. Acceptable values currently are (1, 2, 3).

Returns: List of dictionaries where each dictionary contains next depth subtopics, subtopic ids, topic_id, topic_short_description, topic_long_description, and list of datum_ids.

Source code in nomic/data_operations.py
def group_by_topic(self, topic_depth: int = 1) -> List[Dict]:
    """
    Associates topics at a given depth in the topic hierarchy to the identifiers of their contained datapoints.

    Args:
        topic_depth: Topic depth to group datums by. Acceptable values
            currently are (1, 2, 3).
    Returns:
        List of dictionaries where each dictionary contains next depth
            subtopics, subtopic ids, topic_id, topic_short_description,
            topic_long_description, and list of datum_ids.
    """

    topic_cols = []
    # TODO: This will need to be changed once topic depths becomes dynamic and not hard-coded
    if topic_depth not in (1, 2, 3):
        raise ValueError("Topic depth out of range.")

    # Unique datum id column to aggregate
    datum_id_col = self.project.meta["unique_id_field"]

    df = self.df

    topic_datum_dict = df.groupby(f"topic_depth_{topic_depth}")[datum_id_col].apply(set).to_dict()

    topic_df = self.metadata
    hierarchy = self.hierarchy

    result = []
    for topic, datum_ids in topic_datum_dict.items():
        # Encountered topic with zero datums
        if len(datum_ids) == 0:
            continue

        result_dict = {}
        topic_metadata = topic_df[topic_df["topic_short_description"] == topic]
        subtopics = []
        if topic in hierarchy:
            subtopics = hierarchy[topic]
        result_dict["subtopics"] = subtopics
        result_dict["subtopic_ids"] = topic_df[topic_df["topic_short_description"].isin(subtopics)][
            "topic_id"
        ].tolist()
        result_dict["topic_id"] = topic_metadata["topic_id"].item()
        result_dict["topic_short_description"] = topic_metadata["topic_short_description"].item()
        result_dict["topic_long_description"] = topic_metadata["topic_description"].item()
        result_dict["datum_ids"] = datum_ids
        result.append(result_dict)
    return result
vector_search_topics(queries, k=32, depth=3)

Given an embedding, returns a normalized distribution over topics.

Useful for answering the questions such as:

  • What topic does my new datapoint belong to?
  • Does by datapoint belong to the "Dog" topic or the "Cat" topic.

Parameters:

  • queries (array) –

    a 2d numpy array where each row corresponds to a query vector

  • k (int, default: 32 ) –

    (Default 32) the number of neighbors to use when estimating the posterior

  • depth (int, default: 3 ) –

    (Default 3) the topic depth at which you want to search

Returns:

  • Dict

    A dict mapping {topic: posterior probability} for each query

Source code in nomic/data_operations.py
def vector_search_topics(self, queries: np.array, k: int = 32, depth: int = 3) -> Dict:
    '''
    Given an embedding, returns a normalized distribution over topics.

    Useful for answering the questions such as:

    - What topic does my new datapoint belong to?
    - Does by datapoint belong to the "Dog" topic or the "Cat" topic.

    Args:
        queries: a 2d numpy array where each row corresponds to a query vector
        k: (Default 32) the number of neighbors to use when estimating the posterior
        depth: (Default 3) the topic depth at which you want to search

    Returns:
        A dict mapping {topic: posterior probability} for each query
    '''

    if queries.ndim != 2:
        raise ValueError(
            'Expected a 2 dimensional array. If you have a single query, we expect an array of shape (1, d).'
        )

    bytesio = io.BytesIO()
    np.save(bytesio, queries)

    response = requests.post(
        self.project.atlas_api_path + "/v1/project/data/get/embedding/topic",
        headers=self.project.header,
        json={
            'atlas_index_id': self.projection.atlas_index_id,
            'queries': base64.b64encode(bytesio.getvalue()).decode('utf-8'),
            'k': k,
            'depth': depth,
        },
    )

    if response.status_code != 200:
        raise Exception(response.text)

    return response.json()