Skip to content

Reference

whyhow.embedding module

whyhow_rbr.embedding.generate_embeddings(openai_api_key, chunks, model='text-embedding-3-small')

Generate embeddings for a list of chunks.

Parameters:

Name Type Description Default
openai_api_key str

OpenAI API key.

required
chunks list[str]

List of chunks to generate embeddings for.

required
model str

OpenAI model to use for generating embeddings.

'text-embedding-3-small'

Returns:

Type Description
list[list[float]]

List of embeddings for each chunk.

Source code in whyhow_rbr/embedding.py
def generate_embeddings(
    openai_api_key: str,
    chunks: list[str],
    model: str = "text-embedding-3-small",
) -> list[list[float]]:
    """Generate embeddings for a list of chunks.

    Parameters
    ----------
    openai_api_key : str
        OpenAI API key.

    chunks : list[str]
        List of chunks to generate embeddings for.

    model : str
        OpenAI model to use for generating embeddings.

    Returns
    -------
    list[list[float]]
        List of embeddings for each chunk.

    """
    embeddings = OpenAIEmbeddings(openai_api_key=openai_api_key, model=model)  # type: ignore[call-arg]
    embeddings_array = embeddings.embed_documents(chunks)

    return embeddings_array

whyhow.exceptions module

whyhow_rbr.exceptions.IndexAlreadyExistsException

Bases: Exception

Raised when the index already exists.

Source code in whyhow_rbr/exceptions.py
4
5
6
7
class IndexAlreadyExistsException(Exception):
    """Raised when the index already exists."""

    pass

whyhow_rbr.exceptions.IndexNotFoundException

Bases: Exception

Raised when the index is not found.

Source code in whyhow_rbr/exceptions.py
class IndexNotFoundException(Exception):
    """Raised when the index is not found."""

    pass

whyhow_rbr.exceptions.OpenAIException

Bases: Exception

Raised when the OpenAI API returns an error.

Source code in whyhow_rbr/exceptions.py
class OpenAIException(Exception):
    """Raised when the OpenAI API returns an error."""

    pass

whyhow.processing module

whyhow_rbr.processing.parse_and_split(path, chunk_size=512, chunk_overlap=100)

Parse a PDF and split it into chunks.

Parameters:

Name Type Description Default
path str or Path

Path to the document to process.

required
chunk_size int

Size of the chunks.

512
chunk_overlap int

Overlap between chunks.

100

Returns:

Type Description
list[Document]

The chunks of the pdf.

Source code in whyhow_rbr/processing.py
def parse_and_split(
    path: str | pathlib.Path,
    chunk_size: int = 512,
    chunk_overlap: int = 100,
) -> list[Document]:
    """Parse a PDF and split it into chunks.

    Parameters
    ----------
    path : str or pathlib.Path
        Path to the document to process.

    chunk_size : int
        Size of the chunks.

    chunk_overlap : int
        Overlap between chunks.

    Returns
    -------
    list[Document]
        The chunks of the pdf.
    """
    loader = PyPDFLoader(str(path))
    docs = loader.load()
    splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
    )
    chunks = splitter.split_documents(docs)

    # Assign the change number (within a page) to each chunk
    i_page = 0
    i_chunk = 0

    for chunk in chunks:
        if chunk.metadata["page"] != i_page:
            i_page = chunk.metadata["page"]
            i_chunk = 0

        chunk.metadata["chunk"] = i_chunk
        i_chunk += 1

    return chunks

whyhow_rbr.processing.clean_chunks(chunks)

Clean the chunks of a pdf.

No modifications in-place.

Parameters:

Name Type Description Default
chunks list[Document]

The chunks of the pdf.

required

Returns:

Type Description
list[Document]

The cleaned chunks.

Source code in whyhow_rbr/processing.py
def clean_chunks(
    chunks: list[Document],
) -> list[Document]:
    """Clean the chunks of a pdf.

    No modifications in-place.

    Parameters
    ----------
    chunks : list[Document]
        The chunks of the pdf.

    Returns
    -------
    list[Document]
        The cleaned chunks.
    """
    pattern = re.compile(r"(\r\n|\n|\r)")
    clean_chunks: list[Document] = []

    for chunk in chunks:
        text = re.sub(pattern, "", chunk.page_content)
        new_chunk = Document(
            page_content=text,
            metadata=copy.deepcopy(chunk.metadata),
        )

        clean_chunks.append(new_chunk)

    return clean_chunks

whyhow.rag module

whyhow_rbr.rag.Client

Synchronous client.

Source code in whyhow_rbr/rag.py
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
class Client:
    """Synchronous client."""

    def __init__(
        self,
        openai_api_key: str | None = None,
        pinecone_api_key: str | None = None,
    ):
        if openai_api_key is None:
            openai_api_key = os.environ.get("OPENAI_API_KEY")
            if openai_api_key is None:
                raise ValueError(
                    "No OPENAI_API_KEY provided must be provided."
                )

        if pinecone_api_key is None:
            pinecone_api_key = os.environ.get("PINECONE_API_KEY")
            if pinecone_api_key is None:
                raise ValueError("No PINECONE_API_KEY provided")

        self.openai_client = OpenAI(api_key=openai_api_key)
        self.pinecone_client = Pinecone(api_key=pinecone_api_key)

    def get_index(self, name: str) -> Index:
        """Get an existing index.

        Parameters
        ----------
        name : str
            The name of the index.


        Returns
        -------
        Index
            The index.

        Raises
        ------
        IndexNotFoundException
            If the index does not exist.

        """
        try:
            index = self.pinecone_client.Index(name)
        except NotFoundException as e:
            raise IndexNotFoundException(f"Index {name} does not exist") from e

        return index

    def create_index(
        self,
        name: str,
        dimension: int = 1536,
        metric: Metric = "cosine",
        spec: ServerlessSpec | PodSpec | None = None,
    ) -> Index:
        """Create a new index.

        If the index does not exist, it creates a new index with the specified.

        Parameters
        ----------
        name : str
            The name of the index.

        dimension : int
            The dimension of the index.

        metric : Metric
            The metric of the index.

        spec : ServerlessSpec | PodSpec | None
            The spec of the index. If None, it uses the default spec.

        Raises
        ------
        IndexAlreadyExistsException
            If the index already exists.

        """
        try:
            self.get_index(name)
        except IndexNotFoundException:
            pass
        else:
            raise IndexAlreadyExistsException(f"Index {name} already exists")

        if spec is None:
            spec = DEFAULT_SPEC
            logger.info(f"Using default spec {spec}")

        self.pinecone_client.create_index(
            name=name, dimension=dimension, metric=metric, spec=spec
        )
        index = self.pinecone_client.Index(name)

        return index

    def upload_documents(
        self,
        index: Index,
        documents: list[str | pathlib.Path],
        namespace: str,
        embedding_model: str = "text-embedding-3-small",
        batch_size: int = 100,
    ) -> None:
        """Upload documents to the index.

        Parameters
        ----------
        index : Index
            The index.

        documents : list[str | pathlib.Path]
            The documents to upload.

        namespace : str
            The namespace within the index to use.

        batch_size : int
            The number of documents to upload at a time.

        embedding_model : str
            The OpenAI embedding model to use.

        """
        # don't allow for duplicate documents
        documents = list(set(documents))
        if not documents:
            logger.info("No documents to upload")
            return

        logger.info(f"Parsing {len(documents)} documents")
        all_chunks: list[Document] = []
        for document in documents:
            chunks_ = parse_and_split(document)
            chunks = clean_chunks(chunks_)
            all_chunks.extend(chunks)

        logger.info(f"Embedding {len(all_chunks)} chunks")
        embeddings = generate_embeddings(
            openai_api_key=self.openai_client.api_key,
            chunks=[c.page_content for c in all_chunks],
            model=embedding_model,
        )

        if len(embeddings) != len(all_chunks):
            raise ValueError(
                "Number of embeddings does not match number of chunks"
            )

        # create PineconeDocuments
        pinecone_documents = []
        for i, (chunk, embedding) in enumerate(zip(all_chunks, embeddings)):
            metadata = PineconeMetadata(
                text=chunk.page_content,
                page_number=chunk.metadata["page"],
                chunk_number=chunk.metadata["chunk"],
                filename=chunk.metadata["source"],
            )
            pinecone_document = PineconeDocument(
                values=embedding,
                metadata=metadata,
            )
            pinecone_documents.append(pinecone_document)

        upsert_documents = [d.model_dump() for d in pinecone_documents]

        response = index.upsert(
            upsert_documents, namespace=namespace, batch_size=batch_size
        )
        n_upserted = response["upserted_count"]
        logger.info(f"Upserted {n_upserted} documents")

    def clean_text(self, text: str) -> str:
        """Return a lower case version of text with punctuation removed.

        Parameters
        ----------
        text : str
            The raw text to be cleaned.

        Returns
        -------
        str: The cleaned text string.
        """
        text_processed = re.sub("[^0-9a-zA-Z ]+", "", text.lower())
        text_processed_further = re.sub(" +", " ", text_processed)
        return text_processed_further

    def query(
        self,
        question: str,
        index: Index,
        namespace: str,
        rules: list[Rule] | None = None,
        top_k: int = 5,
        chat_model: str = "gpt-4-1106-preview",
        chat_temperature: float = 0.0,
        chat_max_tokens: int = 1000,
        chat_seed: int = 2,
        embedding_model: str = "text-embedding-3-small",
        process_rules_separately: bool = False,
        keyword_trigger: bool = False,
    ) -> QueryReturnType:
        """Query the index.

        Parameters
        ----------
        question : str
            The question to ask.

        index : Index
            The index to query.

        namespace : str
            The namespace within the index to use.

        rules : list[Rule] | None
            The rules to use for filtering the documents.

        top_k : int
            The number of matches to return per rule.

        chat_model : str
            The OpenAI chat model to use.

        chat_temperature : float
            The temperature for the chat model.

        chat_max_tokens : int
            The maximum number of tokens for the chat model.

        chat_seed : int
            The seed for the chat model.

        embedding_model : str
            The OpenAI embedding model to use.

        process_rules_separately : bool, optional
            Whether to process each rule individually and combine the results at the end.
            When set to True, each rule will be run independently, ensuring that every rule
            returns results. When set to False (default), all rules will be run as one joined
            query, potentially allowing one rule to dominate the others.
            Default is False.

        keyword_trigger : bool, optional
            Whether to trigger rules based on keyword matches in the question.
            Default is False.

        Returns
        -------
        QueryReturnType
            Dictionary with keys "answer", "matches", and "used_contexts".
            The "answer" is the answer to the question.
            The "matches" are the "top_k" matches from the index.
            The "used_contexts" are the indices of the matches
            that were actually used to answer the question.

        Raises
        ------
        OpenAIException
            If there is an error with the OpenAI API. Some possible reasons
            include the chat model not finishing or the response not being
            valid JSON.
        """
        logger.info(f"Raw rules: {rules}")

        if rules is None:
            rules = []

        if keyword_trigger:
            triggered_rules = []
            clean_question = self.clean_text(question).split(" ")

            for rule in rules:
                if rule.keywords:
                    clean_keywords = [
                        self.clean_text(keyword) for keyword in rule.keywords
                    ]

                    if bool(set(clean_keywords) & set(clean_question)):
                        triggered_rules.append(rule)

            rules = triggered_rules

        rule_filters = [rule.to_filter() for rule in rules if rule is not None]

        question_embedding = generate_embeddings(
            openai_api_key=self.openai_client.api_key,
            chunks=[question],
            model=embedding_model,
        )[0]

        matches = (
            []
        )  # Initialize matches outside the loop to collect matches from all queries
        match_texts = []

        # Check if there are any rule filters, and if not, proceed with a default query
        if not rule_filters:
            # Perform a default query
            query_response = index.query(
                namespace=namespace,
                top_k=top_k,
                vector=question_embedding,
                filter=None,  # No specific filter, or you can define a default filter as per your application's logic
                include_metadata=True,
            )
            matches = [
                PineconeMatch(**m.to_dict()) for m in query_response["matches"]
            ]
            match_texts = [m.metadata.text for m in matches]

        else:

            if process_rules_separately:
                for rule_filter in rule_filters:
                    if rule_filter:
                        query_response = index.query(
                            namespace=namespace,
                            top_k=top_k,
                            vector=question_embedding,
                            filter=rule_filter,
                            include_metadata=True,
                        )
                        matches.extend(
                            [
                                PineconeMatch(**m.to_dict())
                                for m in query_response["matches"]
                            ]
                        )
                        match_texts += [m.metadata.text for m in matches]
                match_texts = list(
                    set(match_texts)
                )  # Ensure unique match texts
            else:
                if rule_filters:
                    combined_filters = []
                    for rule_filter in rule_filters:
                        if rule_filter:
                            combined_filters.append(rule_filter)

                    rule_filter = (
                        {"$or": combined_filters} if combined_filters else None
                    )
                else:
                    rule_filter = None  # Fallback to a default query when no rules are provided or valid

                if rule_filter is not None:
                    query_response = index.query(
                        namespace=namespace,
                        top_k=top_k,
                        vector=question_embedding,
                        filter=rule_filter,
                        include_metadata=True,
                    )
                    matches = [
                        PineconeMatch(**m.to_dict())
                        for m in query_response["matches"]
                    ]
                    match_texts = [m.metadata.text for m in matches]

        # Proceed to create prompt, send it to OpenAI, and handle the response
        prompt = self.create_prompt(question, match_texts)
        response = self.openai_client.chat.completions.create(
            model=chat_model,
            seed=chat_seed,
            temperature=chat_temperature,
            messages=[{"role": "user", "content": prompt}],
            max_tokens=chat_max_tokens,
        )

        output = self.process_response(response)

        return_dict: QueryReturnType = {
            "answer": output.answer,
            "matches": [m.model_dump() for m in matches],
            "used_contexts": output.contexts,
        }

        return return_dict

    def create_prompt(self, question: str, match_texts: list[str]) -> str:
        """Create the prompt for the OpenAI chat completion.

        Parameters
        ----------
        question : str
            The question to ask.

        match_texts : list[str]
            The list of context strings to include in the prompt.

        Returns
        -------
        str
            The generated prompt.
        """
        input_actual = Input(question=question, contexts=match_texts)
        prompt_end = f"""
        ACTUAL INPUT
        ```json
        {input_actual.model_dump_json()}
        ```

        ACTUAL OUTPUT
        """
        return f"{PROMPT_START}\n{prompt_end}"

    def process_response(self, response: Any) -> Output:
        """Process the OpenAI chat completion response.

        Parameters
        ----------
        response : Any
            The OpenAI chat completion response.

        Returns
        -------
        Output
            The processed output.

        Raises
        ------
        OpenAIException
            If the chat model did not finish or the response is not valid JSON.
        """
        choice = response.choices[0]
        if choice.finish_reason != "stop":
            raise OpenAIException(
                f"Chat did not finish. Reason: {choice.finish_reason}"
            )

        response_raw = cast(str, response.choices[0].message.content)

        if response_raw.startswith("```json"):
            start_i = response_raw.index("{")
            end_i = response_raw.rindex("}")
            response_raw = response_raw[start_i : end_i + 1]

        try:
            output = Output.model_validate_json(response_raw)
        except ValidationError as e:
            raise OpenAIException(
                f"OpenAI did not return a valid JSON: {response_raw}"
            ) from e

        return output

clean_text(text)

Return a lower case version of text with punctuation removed.

Parameters:

Name Type Description Default
text str

The raw text to be cleaned.

required

Returns:

Name Type Description
str The cleaned text string.
Source code in whyhow_rbr/rag.py
def clean_text(self, text: str) -> str:
    """Return a lower case version of text with punctuation removed.

    Parameters
    ----------
    text : str
        The raw text to be cleaned.

    Returns
    -------
    str: The cleaned text string.
    """
    text_processed = re.sub("[^0-9a-zA-Z ]+", "", text.lower())
    text_processed_further = re.sub(" +", " ", text_processed)
    return text_processed_further

create_index(name, dimension=1536, metric='cosine', spec=None)

Create a new index.

If the index does not exist, it creates a new index with the specified.

Parameters:

Name Type Description Default
name str

The name of the index.

required
dimension int

The dimension of the index.

1536
metric Metric

The metric of the index.

'cosine'
spec ServerlessSpec | PodSpec | None

The spec of the index. If None, it uses the default spec.

None

Raises:

Type Description
IndexAlreadyExistsException

If the index already exists.

Source code in whyhow_rbr/rag.py
def create_index(
    self,
    name: str,
    dimension: int = 1536,
    metric: Metric = "cosine",
    spec: ServerlessSpec | PodSpec | None = None,
) -> Index:
    """Create a new index.

    If the index does not exist, it creates a new index with the specified.

    Parameters
    ----------
    name : str
        The name of the index.

    dimension : int
        The dimension of the index.

    metric : Metric
        The metric of the index.

    spec : ServerlessSpec | PodSpec | None
        The spec of the index. If None, it uses the default spec.

    Raises
    ------
    IndexAlreadyExistsException
        If the index already exists.

    """
    try:
        self.get_index(name)
    except IndexNotFoundException:
        pass
    else:
        raise IndexAlreadyExistsException(f"Index {name} already exists")

    if spec is None:
        spec = DEFAULT_SPEC
        logger.info(f"Using default spec {spec}")

    self.pinecone_client.create_index(
        name=name, dimension=dimension, metric=metric, spec=spec
    )
    index = self.pinecone_client.Index(name)

    return index

create_prompt(question, match_texts)

Create the prompt for the OpenAI chat completion.

Parameters:

Name Type Description Default
question str

The question to ask.

required
match_texts list[str]

The list of context strings to include in the prompt.

required

Returns:

Type Description
str

The generated prompt.

Source code in whyhow_rbr/rag.py
def create_prompt(self, question: str, match_texts: list[str]) -> str:
    """Create the prompt for the OpenAI chat completion.

    Parameters
    ----------
    question : str
        The question to ask.

    match_texts : list[str]
        The list of context strings to include in the prompt.

    Returns
    -------
    str
        The generated prompt.
    """
    input_actual = Input(question=question, contexts=match_texts)
    prompt_end = f"""
    ACTUAL INPUT
    ```json
    {input_actual.model_dump_json()}
    ```

    ACTUAL OUTPUT
    """
    return f"{PROMPT_START}\n{prompt_end}"

get_index(name)

Get an existing index.

Parameters:

Name Type Description Default
name str

The name of the index.

required

Returns:

Type Description
Index

The index.

Raises:

Type Description
IndexNotFoundException

If the index does not exist.

Source code in whyhow_rbr/rag.py
def get_index(self, name: str) -> Index:
    """Get an existing index.

    Parameters
    ----------
    name : str
        The name of the index.


    Returns
    -------
    Index
        The index.

    Raises
    ------
    IndexNotFoundException
        If the index does not exist.

    """
    try:
        index = self.pinecone_client.Index(name)
    except NotFoundException as e:
        raise IndexNotFoundException(f"Index {name} does not exist") from e

    return index

process_response(response)

Process the OpenAI chat completion response.

Parameters:

Name Type Description Default
response Any

The OpenAI chat completion response.

required

Returns:

Type Description
Output

The processed output.

Raises:

Type Description
OpenAIException

If the chat model did not finish or the response is not valid JSON.

Source code in whyhow_rbr/rag.py
def process_response(self, response: Any) -> Output:
    """Process the OpenAI chat completion response.

    Parameters
    ----------
    response : Any
        The OpenAI chat completion response.

    Returns
    -------
    Output
        The processed output.

    Raises
    ------
    OpenAIException
        If the chat model did not finish or the response is not valid JSON.
    """
    choice = response.choices[0]
    if choice.finish_reason != "stop":
        raise OpenAIException(
            f"Chat did not finish. Reason: {choice.finish_reason}"
        )

    response_raw = cast(str, response.choices[0].message.content)

    if response_raw.startswith("```json"):
        start_i = response_raw.index("{")
        end_i = response_raw.rindex("}")
        response_raw = response_raw[start_i : end_i + 1]

    try:
        output = Output.model_validate_json(response_raw)
    except ValidationError as e:
        raise OpenAIException(
            f"OpenAI did not return a valid JSON: {response_raw}"
        ) from e

    return output

query(question, index, namespace, rules=None, top_k=5, chat_model='gpt-4-1106-preview', chat_temperature=0.0, chat_max_tokens=1000, chat_seed=2, embedding_model='text-embedding-3-small', process_rules_separately=False, keyword_trigger=False)

Query the index.

Parameters:

Name Type Description Default
question str

The question to ask.

required
index Index

The index to query.

required
namespace str

The namespace within the index to use.

required
rules list[Rule] | None

The rules to use for filtering the documents.

None
top_k int

The number of matches to return per rule.

5
chat_model str

The OpenAI chat model to use.

'gpt-4-1106-preview'
chat_temperature float

The temperature for the chat model.

0.0
chat_max_tokens int

The maximum number of tokens for the chat model.

1000
chat_seed int

The seed for the chat model.

2
embedding_model str

The OpenAI embedding model to use.

'text-embedding-3-small'
process_rules_separately bool

Whether to process each rule individually and combine the results at the end. When set to True, each rule will be run independently, ensuring that every rule returns results. When set to False (default), all rules will be run as one joined query, potentially allowing one rule to dominate the others. Default is False.

False
keyword_trigger bool

Whether to trigger rules based on keyword matches in the question. Default is False.

False

Returns:

Type Description
QueryReturnType

Dictionary with keys "answer", "matches", and "used_contexts". The "answer" is the answer to the question. The "matches" are the "top_k" matches from the index. The "used_contexts" are the indices of the matches that were actually used to answer the question.

Raises:

Type Description
OpenAIException

If there is an error with the OpenAI API. Some possible reasons include the chat model not finishing or the response not being valid JSON.

Source code in whyhow_rbr/rag.py
def query(
    self,
    question: str,
    index: Index,
    namespace: str,
    rules: list[Rule] | None = None,
    top_k: int = 5,
    chat_model: str = "gpt-4-1106-preview",
    chat_temperature: float = 0.0,
    chat_max_tokens: int = 1000,
    chat_seed: int = 2,
    embedding_model: str = "text-embedding-3-small",
    process_rules_separately: bool = False,
    keyword_trigger: bool = False,
) -> QueryReturnType:
    """Query the index.

    Parameters
    ----------
    question : str
        The question to ask.

    index : Index
        The index to query.

    namespace : str
        The namespace within the index to use.

    rules : list[Rule] | None
        The rules to use for filtering the documents.

    top_k : int
        The number of matches to return per rule.

    chat_model : str
        The OpenAI chat model to use.

    chat_temperature : float
        The temperature for the chat model.

    chat_max_tokens : int
        The maximum number of tokens for the chat model.

    chat_seed : int
        The seed for the chat model.

    embedding_model : str
        The OpenAI embedding model to use.

    process_rules_separately : bool, optional
        Whether to process each rule individually and combine the results at the end.
        When set to True, each rule will be run independently, ensuring that every rule
        returns results. When set to False (default), all rules will be run as one joined
        query, potentially allowing one rule to dominate the others.
        Default is False.

    keyword_trigger : bool, optional
        Whether to trigger rules based on keyword matches in the question.
        Default is False.

    Returns
    -------
    QueryReturnType
        Dictionary with keys "answer", "matches", and "used_contexts".
        The "answer" is the answer to the question.
        The "matches" are the "top_k" matches from the index.
        The "used_contexts" are the indices of the matches
        that were actually used to answer the question.

    Raises
    ------
    OpenAIException
        If there is an error with the OpenAI API. Some possible reasons
        include the chat model not finishing or the response not being
        valid JSON.
    """
    logger.info(f"Raw rules: {rules}")

    if rules is None:
        rules = []

    if keyword_trigger:
        triggered_rules = []
        clean_question = self.clean_text(question).split(" ")

        for rule in rules:
            if rule.keywords:
                clean_keywords = [
                    self.clean_text(keyword) for keyword in rule.keywords
                ]

                if bool(set(clean_keywords) & set(clean_question)):
                    triggered_rules.append(rule)

        rules = triggered_rules

    rule_filters = [rule.to_filter() for rule in rules if rule is not None]

    question_embedding = generate_embeddings(
        openai_api_key=self.openai_client.api_key,
        chunks=[question],
        model=embedding_model,
    )[0]

    matches = (
        []
    )  # Initialize matches outside the loop to collect matches from all queries
    match_texts = []

    # Check if there are any rule filters, and if not, proceed with a default query
    if not rule_filters:
        # Perform a default query
        query_response = index.query(
            namespace=namespace,
            top_k=top_k,
            vector=question_embedding,
            filter=None,  # No specific filter, or you can define a default filter as per your application's logic
            include_metadata=True,
        )
        matches = [
            PineconeMatch(**m.to_dict()) for m in query_response["matches"]
        ]
        match_texts = [m.metadata.text for m in matches]

    else:

        if process_rules_separately:
            for rule_filter in rule_filters:
                if rule_filter:
                    query_response = index.query(
                        namespace=namespace,
                        top_k=top_k,
                        vector=question_embedding,
                        filter=rule_filter,
                        include_metadata=True,
                    )
                    matches.extend(
                        [
                            PineconeMatch(**m.to_dict())
                            for m in query_response["matches"]
                        ]
                    )
                    match_texts += [m.metadata.text for m in matches]
            match_texts = list(
                set(match_texts)
            )  # Ensure unique match texts
        else:
            if rule_filters:
                combined_filters = []
                for rule_filter in rule_filters:
                    if rule_filter:
                        combined_filters.append(rule_filter)

                rule_filter = (
                    {"$or": combined_filters} if combined_filters else None
                )
            else:
                rule_filter = None  # Fallback to a default query when no rules are provided or valid

            if rule_filter is not None:
                query_response = index.query(
                    namespace=namespace,
                    top_k=top_k,
                    vector=question_embedding,
                    filter=rule_filter,
                    include_metadata=True,
                )
                matches = [
                    PineconeMatch(**m.to_dict())
                    for m in query_response["matches"]
                ]
                match_texts = [m.metadata.text for m in matches]

    # Proceed to create prompt, send it to OpenAI, and handle the response
    prompt = self.create_prompt(question, match_texts)
    response = self.openai_client.chat.completions.create(
        model=chat_model,
        seed=chat_seed,
        temperature=chat_temperature,
        messages=[{"role": "user", "content": prompt}],
        max_tokens=chat_max_tokens,
    )

    output = self.process_response(response)

    return_dict: QueryReturnType = {
        "answer": output.answer,
        "matches": [m.model_dump() for m in matches],
        "used_contexts": output.contexts,
    }

    return return_dict

upload_documents(index, documents, namespace, embedding_model='text-embedding-3-small', batch_size=100)

Upload documents to the index.

Parameters:

Name Type Description Default
index Index

The index.

required
documents list[str | Path]

The documents to upload.

required
namespace str

The namespace within the index to use.

required
batch_size int

The number of documents to upload at a time.

100
embedding_model str

The OpenAI embedding model to use.

'text-embedding-3-small'
Source code in whyhow_rbr/rag.py
def upload_documents(
    self,
    index: Index,
    documents: list[str | pathlib.Path],
    namespace: str,
    embedding_model: str = "text-embedding-3-small",
    batch_size: int = 100,
) -> None:
    """Upload documents to the index.

    Parameters
    ----------
    index : Index
        The index.

    documents : list[str | pathlib.Path]
        The documents to upload.

    namespace : str
        The namespace within the index to use.

    batch_size : int
        The number of documents to upload at a time.

    embedding_model : str
        The OpenAI embedding model to use.

    """
    # don't allow for duplicate documents
    documents = list(set(documents))
    if not documents:
        logger.info("No documents to upload")
        return

    logger.info(f"Parsing {len(documents)} documents")
    all_chunks: list[Document] = []
    for document in documents:
        chunks_ = parse_and_split(document)
        chunks = clean_chunks(chunks_)
        all_chunks.extend(chunks)

    logger.info(f"Embedding {len(all_chunks)} chunks")
    embeddings = generate_embeddings(
        openai_api_key=self.openai_client.api_key,
        chunks=[c.page_content for c in all_chunks],
        model=embedding_model,
    )

    if len(embeddings) != len(all_chunks):
        raise ValueError(
            "Number of embeddings does not match number of chunks"
        )

    # create PineconeDocuments
    pinecone_documents = []
    for i, (chunk, embedding) in enumerate(zip(all_chunks, embeddings)):
        metadata = PineconeMetadata(
            text=chunk.page_content,
            page_number=chunk.metadata["page"],
            chunk_number=chunk.metadata["chunk"],
            filename=chunk.metadata["source"],
        )
        pinecone_document = PineconeDocument(
            values=embedding,
            metadata=metadata,
        )
        pinecone_documents.append(pinecone_document)

    upsert_documents = [d.model_dump() for d in pinecone_documents]

    response = index.upsert(
        upsert_documents, namespace=namespace, batch_size=batch_size
    )
    n_upserted = response["upserted_count"]
    logger.info(f"Upserted {n_upserted} documents")

whyhow_rbr.rag.Rule

Bases: BaseModel

Retrieval rule.

The rule is used to filter the documents in the index.

Attributes:

Name Type Description
filename str | None

The filename of the document.

uuid str | None

The UUID of the document.

page_numbers list[int] | None

The page numbers of the document.

keywords list[str] | None

The keywords to trigger a rule.

Source code in whyhow_rbr/rag.py
class Rule(BaseModel):
    """Retrieval rule.

    The rule is used to filter the documents in the index.

    Attributes
    ----------
    filename : str | None
        The filename of the document.

    uuid : str | None
        The UUID of the document.

    page_numbers : list[int] | None
        The page numbers of the document.

    keywords : list[str] | None
        The keywords to trigger a rule.
    """

    filename: str | None = None
    uuid: str | None = None
    page_numbers: list[int] | None = None
    keywords: list[str] | None = None

    @field_validator("page_numbers", mode="before")
    @classmethod
    def convert_empty_to_none(cls, v: list[int] | None) -> list[int] | None:
        """Convert empty list to None."""
        if v is not None and not v:
            return None
        return v

    def convert_empty_str_to_none(
        cls, s: list[str] | None
    ) -> list[str] | None:
        """Convert empty string list to None."""
        if s is not None and not s:
            return None
        return s

    def to_filter(self) -> dict[str, list[dict[str, Any]]] | None:
        """Convert rule to Pinecone filter format."""
        if not any([self.filename, self.uuid, self.page_numbers]):
            return None

        conditions: list[dict[str, Any]] = []
        if self.filename is not None:
            conditions.append({"filename": {"$eq": self.filename}})
        if self.uuid is not None:
            conditions.append({"uuid": {"$eq": self.uuid}})
        if self.page_numbers is not None:
            conditions.append({"page_number": {"$in": self.page_numbers}})

        filter_ = {"$and": conditions}
        return filter_

convert_empty_str_to_none(s)

Convert empty string list to None.

Source code in whyhow_rbr/rag.py
def convert_empty_str_to_none(
    cls, s: list[str] | None
) -> list[str] | None:
    """Convert empty string list to None."""
    if s is not None and not s:
        return None
    return s

convert_empty_to_none(v) classmethod

Convert empty list to None.

Source code in whyhow_rbr/rag.py
@field_validator("page_numbers", mode="before")
@classmethod
def convert_empty_to_none(cls, v: list[int] | None) -> list[int] | None:
    """Convert empty list to None."""
    if v is not None and not v:
        return None
    return v

to_filter()

Convert rule to Pinecone filter format.

Source code in whyhow_rbr/rag.py
def to_filter(self) -> dict[str, list[dict[str, Any]]] | None:
    """Convert rule to Pinecone filter format."""
    if not any([self.filename, self.uuid, self.page_numbers]):
        return None

    conditions: list[dict[str, Any]] = []
    if self.filename is not None:
        conditions.append({"filename": {"$eq": self.filename}})
    if self.uuid is not None:
        conditions.append({"uuid": {"$eq": self.uuid}})
    if self.page_numbers is not None:
        conditions.append({"page_number": {"$in": self.page_numbers}})

    filter_ = {"$and": conditions}
    return filter_

whyhow_rbr.rag.PineconeMetadata

Bases: BaseModel

The metadata to be stored in Pinecone.

Attributes:

Name Type Description
text str

The text of the document.

page_number int

The page number of the document.

chunk_number int

The chunk number of the document.

filename str

The filename of the document.

uuid str

The UUID of the document. Note that this is not required to be provided when creating the metadata. It is generated automatically when creating the PineconeDocument.

Source code in whyhow_rbr/rag.py
class PineconeMetadata(BaseModel, extra="forbid"):
    """The metadata to be stored in Pinecone.

    Attributes
    ----------
    text : str
        The text of the document.

    page_number : int
        The page number of the document.

    chunk_number : int
        The chunk number of the document.

    filename : str
        The filename of the document.

    uuid : str
        The UUID of the document. Note that this is not required to be
        provided when creating the metadata. It is generated automatically
        when creating the PineconeDocument.
    """

    text: str
    page_number: int
    chunk_number: int
    filename: str
    uuid: str = Field(default_factory=lambda: str(uuid.uuid4()))

whyhow_rbr.rag.PineconeDocument

Bases: BaseModel

The actual document to be stored in Pinecone.

Attributes:

Name Type Description
metadata PineconeMetadata

The metadata of the document.

values list[float] | None

The embedding of the document. The None is used when querying the index since the values are not needed. At upsert time, the values are required.

id str | None

The human-readable identifier of the document. This is generated automatically when creating the PineconeDocument unless it is provided.

Source code in whyhow_rbr/rag.py
class PineconeDocument(BaseModel, extra="forbid"):
    """The actual document to be stored in Pinecone.

    Attributes
    ----------
    metadata : PineconeMetadata
        The metadata of the document.

    values : list[float] | None
        The embedding of the document. The None is used when querying
        the index since the values are not needed. At upsert time, the
        values are required.

    id : str | None
        The human-readable identifier of the document. This is generated
        automatically when creating the PineconeDocument unless it is
        provided.

    """

    metadata: PineconeMetadata
    values: list[float] | None = None
    id: str | None = None

    @model_validator(mode="after")
    def generate_human_readable_id(self) -> "PineconeDocument":
        """Generate a human-readable identifier for the document."""
        if self.id is None:
            meta = self.metadata
            hr_id = f"{meta.filename}-{meta.page_number}-{meta.chunk_number}"
            self.id = hr_id

        return self

generate_human_readable_id()

Generate a human-readable identifier for the document.

Source code in whyhow_rbr/rag.py
@model_validator(mode="after")
def generate_human_readable_id(self) -> "PineconeDocument":
    """Generate a human-readable identifier for the document."""
    if self.id is None:
        meta = self.metadata
        hr_id = f"{meta.filename}-{meta.page_number}-{meta.chunk_number}"
        self.id = hr_id

    return self

whyhow_rbr.rag.PineconeMatch

Bases: BaseModel

The match returned from Pinecone.

Attributes:

Name Type Description
id str

The ID of the document.

score float

The score of the match. Its meaning depends on the metric used for the index.

metadata PineconeMetadata

The metadata of the document.

Source code in whyhow_rbr/rag.py
class PineconeMatch(BaseModel, extra="ignore"):
    """The match returned from Pinecone.

    Attributes
    ----------
    id : str
        The ID of the document.

    score : float
        The score of the match. Its meaning depends on the metric used for
        the index.

    metadata : PineconeMetadata
        The metadata of the document.

    """

    id: str
    score: float
    metadata: PineconeMetadata

whyhow_rbr.rag.Input

Bases: BaseModel

Example input for the prompt.

Attributes:

Name Type Description
question str

The question to ask.

contexts list[str]

The contexts to use for answering the question.

Source code in whyhow_rbr/rag.py
class Input(BaseModel):
    """Example input for the prompt.

    Attributes
    ----------
    question : str
        The question to ask.

    contexts : list[str]
        The contexts to use for answering the question.
    """

    question: str
    contexts: list[str]

whyhow_rbr.rag.Output

Bases: BaseModel

Example output for the prompt.

Attributes:

Name Type Description
answer str

The answer to the question.

contexts list[int]

The indices of the contexts that were used to answer the question.

Source code in whyhow_rbr/rag.py
class Output(BaseModel):
    """Example output for the prompt.

    Attributes
    ----------
    answer : str
        The answer to the question.

    contexts : list[int]
        The indices of the contexts that were used to answer the question.
    """

    answer: str
    contexts: list[int]

whyhow_rbr.rag.Client.query(question, index, namespace, rules=None, top_k=5, chat_model='gpt-4-1106-preview', chat_temperature=0.0, chat_max_tokens=1000, chat_seed=2, embedding_model='text-embedding-3-small', process_rules_separately=False, keyword_trigger=False)

Query the index.

Parameters:

Name Type Description Default
question str

The question to ask.

required
index Index

The index to query.

required
namespace str

The namespace within the index to use.

required
rules list[Rule] | None

The rules to use for filtering the documents.

None
top_k int

The number of matches to return per rule.

5
chat_model str

The OpenAI chat model to use.

'gpt-4-1106-preview'
chat_temperature float

The temperature for the chat model.

0.0
chat_max_tokens int

The maximum number of tokens for the chat model.

1000
chat_seed int

The seed for the chat model.

2
embedding_model str

The OpenAI embedding model to use.

'text-embedding-3-small'
process_rules_separately bool

Whether to process each rule individually and combine the results at the end. When set to True, each rule will be run independently, ensuring that every rule returns results. When set to False (default), all rules will be run as one joined query, potentially allowing one rule to dominate the others. Default is False.

False
keyword_trigger bool

Whether to trigger rules based on keyword matches in the question. Default is False.

False

Returns:

Type Description
QueryReturnType

Dictionary with keys "answer", "matches", and "used_contexts". The "answer" is the answer to the question. The "matches" are the "top_k" matches from the index. The "used_contexts" are the indices of the matches that were actually used to answer the question.

Raises:

Type Description
OpenAIException

If there is an error with the OpenAI API. Some possible reasons include the chat model not finishing or the response not being valid JSON.

Source code in whyhow_rbr/rag.py
def query(
    self,
    question: str,
    index: Index,
    namespace: str,
    rules: list[Rule] | None = None,
    top_k: int = 5,
    chat_model: str = "gpt-4-1106-preview",
    chat_temperature: float = 0.0,
    chat_max_tokens: int = 1000,
    chat_seed: int = 2,
    embedding_model: str = "text-embedding-3-small",
    process_rules_separately: bool = False,
    keyword_trigger: bool = False,
) -> QueryReturnType:
    """Query the index.

    Parameters
    ----------
    question : str
        The question to ask.

    index : Index
        The index to query.

    namespace : str
        The namespace within the index to use.

    rules : list[Rule] | None
        The rules to use for filtering the documents.

    top_k : int
        The number of matches to return per rule.

    chat_model : str
        The OpenAI chat model to use.

    chat_temperature : float
        The temperature for the chat model.

    chat_max_tokens : int
        The maximum number of tokens for the chat model.

    chat_seed : int
        The seed for the chat model.

    embedding_model : str
        The OpenAI embedding model to use.

    process_rules_separately : bool, optional
        Whether to process each rule individually and combine the results at the end.
        When set to True, each rule will be run independently, ensuring that every rule
        returns results. When set to False (default), all rules will be run as one joined
        query, potentially allowing one rule to dominate the others.
        Default is False.

    keyword_trigger : bool, optional
        Whether to trigger rules based on keyword matches in the question.
        Default is False.

    Returns
    -------
    QueryReturnType
        Dictionary with keys "answer", "matches", and "used_contexts".
        The "answer" is the answer to the question.
        The "matches" are the "top_k" matches from the index.
        The "used_contexts" are the indices of the matches
        that were actually used to answer the question.

    Raises
    ------
    OpenAIException
        If there is an error with the OpenAI API. Some possible reasons
        include the chat model not finishing or the response not being
        valid JSON.
    """
    logger.info(f"Raw rules: {rules}")

    if rules is None:
        rules = []

    if keyword_trigger:
        triggered_rules = []
        clean_question = self.clean_text(question).split(" ")

        for rule in rules:
            if rule.keywords:
                clean_keywords = [
                    self.clean_text(keyword) for keyword in rule.keywords
                ]

                if bool(set(clean_keywords) & set(clean_question)):
                    triggered_rules.append(rule)

        rules = triggered_rules

    rule_filters = [rule.to_filter() for rule in rules if rule is not None]

    question_embedding = generate_embeddings(
        openai_api_key=self.openai_client.api_key,
        chunks=[question],
        model=embedding_model,
    )[0]

    matches = (
        []
    )  # Initialize matches outside the loop to collect matches from all queries
    match_texts = []

    # Check if there are any rule filters, and if not, proceed with a default query
    if not rule_filters:
        # Perform a default query
        query_response = index.query(
            namespace=namespace,
            top_k=top_k,
            vector=question_embedding,
            filter=None,  # No specific filter, or you can define a default filter as per your application's logic
            include_metadata=True,
        )
        matches = [
            PineconeMatch(**m.to_dict()) for m in query_response["matches"]
        ]
        match_texts = [m.metadata.text for m in matches]

    else:

        if process_rules_separately:
            for rule_filter in rule_filters:
                if rule_filter:
                    query_response = index.query(
                        namespace=namespace,
                        top_k=top_k,
                        vector=question_embedding,
                        filter=rule_filter,
                        include_metadata=True,
                    )
                    matches.extend(
                        [
                            PineconeMatch(**m.to_dict())
                            for m in query_response["matches"]
                        ]
                    )
                    match_texts += [m.metadata.text for m in matches]
            match_texts = list(
                set(match_texts)
            )  # Ensure unique match texts
        else:
            if rule_filters:
                combined_filters = []
                for rule_filter in rule_filters:
                    if rule_filter:
                        combined_filters.append(rule_filter)

                rule_filter = (
                    {"$or": combined_filters} if combined_filters else None
                )
            else:
                rule_filter = None  # Fallback to a default query when no rules are provided or valid

            if rule_filter is not None:
                query_response = index.query(
                    namespace=namespace,
                    top_k=top_k,
                    vector=question_embedding,
                    filter=rule_filter,
                    include_metadata=True,
                )
                matches = [
                    PineconeMatch(**m.to_dict())
                    for m in query_response["matches"]
                ]
                match_texts = [m.metadata.text for m in matches]

    # Proceed to create prompt, send it to OpenAI, and handle the response
    prompt = self.create_prompt(question, match_texts)
    response = self.openai_client.chat.completions.create(
        model=chat_model,
        seed=chat_seed,
        temperature=chat_temperature,
        messages=[{"role": "user", "content": prompt}],
        max_tokens=chat_max_tokens,
    )

    output = self.process_response(response)

    return_dict: QueryReturnType = {
        "answer": output.answer,
        "matches": [m.model_dump() for m in matches],
        "used_contexts": output.contexts,
    }

    return return_dict
:docstring:
:members:
:undoc-members:
:show-inheritance:
:special-members: __init__